如何使给定数量的 Promise 在 Nodejs 上同时运行?[复制]

提问人:Robson Luan 提问时间:11/6/2023 最后编辑:Robson Luan 更新时间:11/6/2023 访问量:109


因此,我正在创建这个抓取应用程序,它基本上将 REST API 用于服务器上的多个实体。主要功能只是:

let buffer = []
for(entity in entities){
   let data = await scrapingProcessForAnEntity(entity);

我过度简化了脚本,因为抓取过程及其存储方式无关紧要,关键是我有这个函数来获取并返回我在 Promise 中需要的所有信息。 问题是,由于有很多实体,我想让它能够一次为多个实体运行流程,一旦其中一个流程完成,一个新的流程就会取代它。我做了一些测试,试图使用一个 Promise 数组,但我无法弄清楚如何使完成的 Promise 退出数组。我也不能一次为所有实体运行该进程,因为服务器无法一次处理太多请求。如果我将其限制为 3~5 个实体,应该没问题。scrapingProcessForAnEntityPromise.race()Promise.all()

我想做的基本上是“在超市里有 3 个收银员的一条线”。一旦收银员有空(等于第一个 Promise 完成),下一个客户端就可以立即继续(输入 promise 数组)。


let promises = []
let buffer = []
async function run(){
   for(entity in entities){
      addPromise(() => scrapingProcessForAnEntity(entity))

async function addPromise(prom){
   if(promises.length >= 3){ //In this example I'm trying to make it run for up to 3 at a time
      await moveQueue()

async function moveQueue(){
   if(promises.length < 3){
   let data = await Promise.race(promises)
   promises.splice(promises.indexOf(data), 1) //<---- This is where I'm struggling at
   //how can I remove the finished promised from the array? 

将数据添加到缓冲区中不会直接在 Promise 本身中实现,因为其中涉及处理,我不确定同时添加 2 个 promise 的数据是否会导致问题。


JavaScript 节点.js 异步等待承诺


我还在下面的另一条评论中解释说,“如果我不从数组中删除已完成的承诺,Promise.race 将继续返回已经完成的承诺。我的问题是如何找到下一个完成的”
您正在使用返回最早解析数据的 ''await Promise.race'。假设您设法从数组中删除了解析的 promise。但是你永远不知道其他承诺什么时候解决,你打算如何从数组中删除它?这里的方法是完全错误的。
这个想法是,一旦 Promise 完成,它就会被删除,这就是作用。每当我尝试将新的 Promise 添加到已经满的数组中时,都会调用此函数。等待后的函数将添加另一个 promise。此过程将重复为 wait -> remove -> add,直到我到达最后 3 个实体。在这一点上,我有另一个函数,它使用 a 来解决所有剩余的 Promise。moveQueueaddPrommoveQueuePromise.all


您可以使用 map 方法执行多个 promise ,例如


// Function to chunk an array into smaller arrays
function chunkArray(array, chunkSize) {
  const chunks = [];
  for (let i = 0; i < array.length; i += chunkSize) {
    chunks.push(array.slice(i, i + chunkSize));
  return chunks;

// Assuming scrapingProcessForAnEntity returns a Promise
const scrapingProcessForAnEntity = (query) => {
  // Your asynchronous API call logic here
  return new Promise((resolve, reject) => {
    // Example asynchronous operation (replace with your actual API call)
    setTimeout(() => {
      console.log(`API call for query: ${query}`);
      resolve(`Data for query: ${query}`);
    }, 1000); // Simulated delay of 1 second

const entities = ['entity1', 'entity2', 'entity3', 'entity4', 'entity5', 'entity6'];

// Chunk the entities into arrays of size 3
const chunkedEntities = chunkArray(entities, 3);

// Map over the chunks and perform asynchronous operations using Promise.all
const allPromises = chunkedEntities.map(chunk => {
  const promiseQuery = chunk.map(entity => scrapingProcessForAnEntity(entity));
  return Promise.all(promiseQuery);

// Flatten the results if needed (if you want a flat array of fulfilled promises)
const fullfilledPromises = Promise.all(allPromises.flat());

fullfilledPromises.then(results => {
  console.log('All API calls completed:', results);
}).catch(error => {
  console.error('Error:', error);

这里 allEntities 将具有 promise 数组,而 Promise.all 方法将按时执行它


@RobsonLuan假设您有 10 个实体的数组,但服务器一次接受 5 个,因此您要做的是将实体分成 5 个,并使用 flat() 方法执行嵌套循环来获取该分块的承诺值
这将产生与另一个答案相同的结果,对吗?它将能够一次运行 5 个,但只有在所有这 5 个都完成后,这些进程才会“向前推进”。这是一个不错的解决方案,我已经实现了类似的东西,但我只是想寻找一种方法来添加/删除正在运行的 Promise。
那么你可以使用 Promise.race() 方法,就是这样
它确实适用于前一组进程,但是如果我不从数组中删除已完成的 promise,Promise.race 将继续返回已完成的 promise。我的问题是如何找到下一个完成的
let entities = [] // all entities
let batch = []; // holding current batch of promises
let buffer = []; // holding all the scraped data

function scrapingProcessForBatch(batch) {
   return Promise.all(batch.map(entity => scrapingProcessForAnEntity(entity)))

async function addBatch() {
   batch = [];
   if (!entities.length) return; // if no more entities, exit

   for (let i=0; i<3; i++) {
      let entity = entities.pop();
      if (entity) batch.push(entity);

   let data = await scrapingProcessForBatch(batch);

   await addBatch();

async function run() {
   await addBatch();
   // all data in buffer array now

这可确保您始终一次处理 3 个实体,一旦一个批次完成,就会有一个新实体取而代之。您可以根据自己的喜好进行调整。


Promise.all将返回所有 3 个 promise 的数据数组。你需要在推动时传播它。buffer.push(...data)
我目前对这个问题的工作解决方案是按照你说的去做。问题是,该过程对每个实体都没有一致的时间,这取决于它保存了多少数据。花费时间最长的可能长达 5 分钟。如果我使用 Promise.all,脚本在此期间只能从 3 个实体获取数据,而 Promise.race 可以获取更多数据。
我建议同时使用 Promise.all() 和 Promise.race() 的组合。您可以将实体分成 3 个批次,并使用 Promise.all() 同时运行这些批次。然后,一旦一个批处理完成,使用 Promise.race() 将其替换为新批处理。
    let tasks = []
    let buffer = []
    let runningTasks = 0

    async function run(){
       for(entity in entities){
          addPromise(() => scrapingProcessForAnEntity(entity))

    async function addPromise(prom){

    async function processQueue() {

       // [1] initally runningTasks = 0
       if (runningTasks === 3 || tasks.length === 0) {
       // [2] take 1st task and remove it from array
       const taskPromise = tasks[0]();

       // [3] executes 1st task, won't wait
       taskPromise.then((data) => {
          runningTasks -= 1;
          // Check again if there is anything to execute
       // [4] If running tasks is less than 3 and if tasks has more, execute again
       // If runningTasks already 3, Then no need to execute now, It will be handled in then() above
       if ( runningTasks < 3 && tasks.length > 0) {


tasks.slice(0, 1);什么也不做。你是说吗?shift()
class LimitedResourceExecutor {

  constructor(size=1) {
    this.#size = size
    this.#queue = []
    this.#running = 0
  get size() { return this.#size }

  // enqueue a task, then check if it should be run
  call(obj, func, ...args) {
    return new Promise((resolve, reject) => {
      this.#queue.push([resolve, reject, obj, func, args])

  // run a task if it can be run, check again afterwards
  async #runQueue() {
    if (this.#queue.length && this.#running < this.size) {
      const [resolve, reject, obj, func, args] = this.#queue.shift()
      try {
        const result = await func.call(obj, ...args)
      } catch (err) {
      } finally {

// From https://stackoverflow.com/a/39914235/240443
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms))

// just a little test function
let id = 0
let start = new Date().getTime()
async function sleepAndLog(ms) {
  const thisId = ++id
  console.log(new Date() - start, thisId, "starting", ms)
  await sleep(ms)
  console.log(new Date() - start, thisId, "finished", ms)
  return thisId

// only two tasks in parallel
const lre = new LimitedResourceExecutor(2)
const timings = [4000, 1000, 1000, 2000, 3000, 1000]
// construct a promise list
const promises = timings.map(ms => lre.call(null, sleepAndLog, ms))
// Promise.all still works, results coming in the correct order
Promise.all(promises).then(results => {

检查控制台日志,查看同时执行的任务是否不超过两个。另请注意,即使任务 1 很长,任务 3 也会在任务 2 完成后立即执行,而无需等待任务 1,这是分块执行所获得的行为。