如何使给定数量的 Promise 在 Nodejs 上同时运行?[复制]

How can I make a given number of Promises run at the same time on Nodejs? [duplicate]

提问人:Robson Luan 提问时间:11/6/2023 最后编辑:Robson Luan 更新时间:11/6/2023 访问量:109

问:

因此,我正在创建这个抓取应用程序,它基本上将 REST API 用于服务器上的多个实体。主要功能只是:

let buffer = []
for(entity in entities){
   let data = await scrapingProcessForAnEntity(entity);
   buffer.push(data)
}

我过度简化了脚本,因为抓取过程及其存储方式无关紧要,关键是我有这个函数来获取并返回我在 Promise 中需要的所有信息。 问题是,由于有很多实体,我想让它能够一次为多个实体运行流程,一旦其中一个流程完成,一个新的流程就会取代它。我做了一些测试,试图使用一个 Promise 数组,但我无法弄清楚如何使完成的 Promise 退出数组。我也不能一次为所有实体运行该进程,因为服务器无法一次处理太多请求。如果我将其限制为 3~5 个实体,应该没问题。scrapingProcessForAnEntityPromise.race()Promise.all()

我想做的基本上是“在超市里有 3 个收银员的一条线”。一旦收银员有空(等于第一个 Promise 完成),下一个客户端就可以立即继续(输入 promise 数组)。

我目前的实现是:

let promises = []
let buffer = []
async function run(){
   for(entity in entities){
      addPromise(() => scrapingProcessForAnEntity(entity))
   }
}

async function addPromise(prom){
   if(promises.length >= 3){ //In this example I'm trying to make it run for up to 3 at a time
      await moveQueue()
   }
   promises.push(prom())
}

async function moveQueue(){
   if(promises.length < 3){
      return
   }
   let data = await Promise.race(promises)
   buffer.push(data)
   promises.splice(promises.indexOf(data), 1) //<---- This is where I'm struggling at
   //how can I remove the finished promised from the array? 
}

将数据添加到缓冲区中不会直接在 Promise 本身中实现,因为其中涉及处理,我不确定同时添加 2 个 promise 的数据是否会导致问题。

我还实施了一种方法来清除最后的所有承诺。我唯一的困难是如何找到数组内部的哪个承诺已经完成,以便可以替换它。

JavaScript 节点.js 异步等待承诺

评论

0赞 BadPiggie 11/6/2023
我的问题是,为什么只想从数组中删除已解决的承诺?是否要重试被拒绝的那个?如果没有,您可以简单地刷新数组,因为如果它解析或拒绝,则无需将其保留在那里,对吗?
1赞 Robson Luan 11/6/2023
我只想删除已完成的进程,无论是已解决的还是已拒绝的,因为实体进程完成所需的时间不是恒定的。因此,如果存在“一个大实体”,它不会阻止执行,它将在数组上保留一个插槽,但可以使用其他插槽,因为它们可能会更早完成。
1赞 Robson Luan 11/6/2023
我还在下面的另一条评论中解释说,“如果我不从数组中删除已完成的承诺,Promise.race 将继续返回已经完成的承诺。我的问题是如何找到下一个完成的”
0赞 BadPiggie 11/6/2023
您正在使用返回最早解析数据的 ''await Promise.race'。假设您设法从数组中删除了解析的 promise。但是你永远不知道其他承诺什么时候解决,你打算如何从数组中删除它?这里的方法是完全错误的。
1赞 Robson Luan 11/6/2023
这个想法是,一旦 Promise 完成,它就会被删除,这就是作用。每当我尝试将新的 Promise 添加到已经满的数组中时,都会调用此函数。等待后的函数将添加另一个 promise。此过程将重复为 wait -> remove -> add,直到我到达最后 3 个实体。在这一点上,我有另一个函数,它使用 a 来解决所有剩余的 Promise。moveQueueaddPrommoveQueuePromise.all

答:

0赞 Node Developer 11/6/2023 #1

您可以使用 map 方法执行多个 promise ,例如

在做出承诺之前,您可以对实体进行分块,您需要多少个实体

// Function to chunk an array into smaller arrays
function chunkArray(array, chunkSize) {
  const chunks = [];
  for (let i = 0; i < array.length; i += chunkSize) {
    chunks.push(array.slice(i, i + chunkSize));
  }
  return chunks;
}

// Assuming scrapingProcessForAnEntity returns a Promise
const scrapingProcessForAnEntity = (query) => {
  // Your asynchronous API call logic here
  return new Promise((resolve, reject) => {
    // Example asynchronous operation (replace with your actual API call)
    setTimeout(() => {
      console.log(`API call for query: ${query}`);
      resolve(`Data for query: ${query}`);
    }, 1000); // Simulated delay of 1 second
  });
};

const entities = ['entity1', 'entity2', 'entity3', 'entity4', 'entity5', 'entity6'];

// Chunk the entities into arrays of size 3
const chunkedEntities = chunkArray(entities, 3);

// Map over the chunks and perform asynchronous operations using Promise.all
const allPromises = chunkedEntities.map(chunk => {
  const promiseQuery = chunk.map(entity => scrapingProcessForAnEntity(entity));
  return Promise.all(promiseQuery);
});

// Flatten the results if needed (if you want a flat array of fulfilled promises)
const fullfilledPromises = Promise.all(allPromises.flat());

fullfilledPromises.then(results => {
  console.log('All API calls completed:', results);
}).catch(error => {
  console.error('Error:', error);
});

这里 allEntities 将具有 promise 数组,而 Promise.all 方法将按时执行它

评论

0赞 Robson Luan 11/6/2023
我将针对这个问题进行编辑,但我限制一次应该运行多少个请求的主要原因是因为服务器无法一次处理太多请求。
0赞 Node Developer 11/6/2023
@RobsonLuan假设您有 10 个实体的数组,但服务器一次接受 5 个,因此您要做的是将实体分成 5 个,并使用 flat() 方法执行嵌套循环来获取该分块的承诺值
0赞 Robson Luan 11/6/2023
这将产生与另一个答案相同的结果,对吗?它将能够一次运行 5 个,但只有在所有这 5 个都完成后,这些进程才会“向前推进”。这是一个不错的解决方案,我已经实现了类似的东西,但我只是想寻找一种方法来添加/删除正在运行的 Promise。
0赞 Node Developer 11/6/2023
那么你可以使用 Promise.race() 方法,就是这样
0赞 Robson Luan 11/6/2023
它确实适用于前一组进程,但是如果我不从数组中删除已完成的 promise,Promise.race 将继续返回已完成的 promise。我的问题是如何找到下一个完成的
1赞 Afzal K. 11/6/2023 #2
let entities = [] // all entities
let batch = []; // holding current batch of promises
let buffer = []; // holding all the scraped data

function scrapingProcessForBatch(batch) {
   return Promise.all(batch.map(entity => scrapingProcessForAnEntity(entity)))
}

async function addBatch() {
   batch = [];
   if (!entities.length) return; // if no more entities, exit

   for (let i=0; i<3; i++) {
      let entity = entities.pop();
      if (entity) batch.push(entity);
   }

   let data = await scrapingProcessForBatch(batch);
   buffer.push(...data);

   await addBatch();
}

async function run() {
   await addBatch();
   // all data in buffer array now
}

这可确保您始终一次处理 3 个实体,一旦一个批次完成,就会有一个新实体取而代之。您可以根据自己的喜好进行调整。

评论

0赞 BadPiggie 11/6/2023
Promise.all将返回所有 3 个 promise 的数据数组。你需要在推动时传播它。buffer.push(...data)
0赞 Robson Luan 11/6/2023
我目前对这个问题的工作解决方案是按照你说的去做。问题是,该过程对每个实体都没有一致的时间,这取决于它保存了多少数据。花费时间最长的可能长达 5 分钟。如果我使用 Promise.all,脚本在此期间只能从 3 个实体获取数据,而 Promise.race 可以获取更多数据。
1赞 Afzal K. 11/6/2023
我建议同时使用 Promise.all() 和 Promise.race() 的组合。您可以将实体分成 3 个批次,并使用 Promise.all() 同时运行这些批次。然后,一旦一个批处理完成,使用 Promise.race() 将其替换为新批处理。
1赞 BadPiggie 11/6/2023 #3

检查以下解决方案是否按您的需要工作。在这里,我保持队列不受限制,但控制执行阈值。tasks

    let tasks = []
    let buffer = []
    let runningTasks = 0

    async function run(){
       for(entity in entities){
          addPromise(() => scrapingProcessForAnEntity(entity))
       }
       
       processQueue();
    }

    async function addPromise(prom){
       tasks.push(prom)
    }

    async function processQueue() {

       // [1] initally runningTasks = 0
       if (runningTasks === 3 || tasks.length === 0) {
          return;
       }
       
       // [2] take 1st task and remove it from array
       const taskPromise = tasks[0]();
       tasks.shift();

       // [3] executes 1st task, won't wait
       taskPromise.then((data) => {
          buffer.push(data);
          runningTasks -= 1;
          
          // Check again if there is anything to execute
          processQueue();
       });
       
       // [4] If running tasks is less than 3 and if tasks has more, execute again
       // If runningTasks already 3, Then no need to execute now, It will be handled in then() above
       if ( runningTasks < 3 && tasks.length > 0) {
          processQueue();
       }
    }

评论

0赞 Bergi 11/6/2023
tasks.slice(0, 1);什么也不做。你是说吗?shift()
0赞 BadPiggie 11/6/2023
谢谢你的指出。我的意思是.从队列中删除。当然也可以使用shift键splice
0赞 Amadan 11/6/2023 #4

这比分块更有效。这个想法是计算当前正在执行的承诺,并仅在该计数低于限制时启动下一个承诺。

class LimitedResourceExecutor {
  #size
  #queue
  #running

  constructor(size=1) {
    this.#size = size
    this.#queue = []
    this.#running = 0
  }
  
  get size() { return this.#size }

  // enqueue a task, then check if it should be run
  call(obj, func, ...args) {
    return new Promise((resolve, reject) => {
      this.#queue.push([resolve, reject, obj, func, args])
      this.#runQueue()
    })
  }

  // run a task if it can be run, check again afterwards
  async #runQueue() {
    if (this.#queue.length && this.#running < this.size) {
      this.#running++
      const [resolve, reject, obj, func, args] = this.#queue.shift()
      try {
        const result = await func.call(obj, ...args)
        resolve(result)
      } catch (err) {
        reject(err)
      } finally {
        this.#running--
        this.#runQueue()
      }
    }
  }
}


// From https://stackoverflow.com/a/39914235/240443
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

// just a little test function
let id = 0
let start = new Date().getTime()
async function sleepAndLog(ms) {
  const thisId = ++id
  console.log(new Date() - start, thisId, "starting", ms)
  await sleep(ms)
  console.log(new Date() - start, thisId, "finished", ms)
  return thisId
}

// only two tasks in parallel
const lre = new LimitedResourceExecutor(2)
const timings = [4000, 1000, 1000, 2000, 3000, 1000]
// construct a promise list
const promises = timings.map(ms => lre.call(null, sleepAndLog, ms))
// Promise.all still works, results coming in the correct order
Promise.all(promises).then(results => {
  console.log("DONE")
  console.log(JSON.stringify(results))
})

检查控制台日志,查看同时执行的任务是否不超过两个。另请注意,即使任务 1 很长,任务 3 也会在任务 2 完成后立即执行,而无需等待任务 1,这是分块执行所获得的行为。