提问人:Robson Luan 提问时间:11/6/2023 最后编辑:Robson Luan 更新时间:11/6/2023 访问量:109
如何使给定数量的 Promise 在 Nodejs 上同时运行?[复制]
How can I make a given number of Promises run at the same time on Nodejs? [duplicate]
问:
因此,我正在创建这个抓取应用程序,它基本上将 REST API 用于服务器上的多个实体。主要功能只是:
let buffer = []
for(entity in entities){
let data = await scrapingProcessForAnEntity(entity);
buffer.push(data)
}
我过度简化了脚本,因为抓取过程及其存储方式无关紧要,关键是我有这个函数来获取并返回我在 Promise 中需要的所有信息。
问题是,由于有很多实体,我想让它能够一次为多个实体运行流程,一旦其中一个流程完成,一个新的流程就会取代它。我做了一些测试,试图使用一个 Promise 数组,但我无法弄清楚如何使完成的 Promise 退出数组。我也不能一次为所有实体运行该进程,因为服务器无法一次处理太多请求。如果我将其限制为 3~5 个实体,应该没问题。scrapingProcessForAnEntity
Promise.race()
Promise.all()
我想做的基本上是“在超市里有 3 个收银员的一条线”。一旦收银员有空(等于第一个 Promise 完成),下一个客户端就可以立即继续(输入 promise 数组)。
我目前的实现是:
let promises = []
let buffer = []
async function run(){
for(entity in entities){
addPromise(() => scrapingProcessForAnEntity(entity))
}
}
async function addPromise(prom){
if(promises.length >= 3){ //In this example I'm trying to make it run for up to 3 at a time
await moveQueue()
}
promises.push(prom())
}
async function moveQueue(){
if(promises.length < 3){
return
}
let data = await Promise.race(promises)
buffer.push(data)
promises.splice(promises.indexOf(data), 1) //<---- This is where I'm struggling at
//how can I remove the finished promised from the array?
}
将数据添加到缓冲区中不会直接在 Promise 本身中实现,因为其中涉及处理,我不确定同时添加 2 个 promise 的数据是否会导致问题。
我还实施了一种方法来清除最后的所有承诺。我唯一的困难是如何找到数组内部的哪个承诺已经完成,以便可以替换它。
答:
您可以使用 map 方法执行多个 promise ,例如
在做出承诺之前,您可以对实体进行分块,您需要多少个实体
// Function to chunk an array into smaller arrays
function chunkArray(array, chunkSize) {
const chunks = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
// Assuming scrapingProcessForAnEntity returns a Promise
const scrapingProcessForAnEntity = (query) => {
// Your asynchronous API call logic here
return new Promise((resolve, reject) => {
// Example asynchronous operation (replace with your actual API call)
setTimeout(() => {
console.log(`API call for query: ${query}`);
resolve(`Data for query: ${query}`);
}, 1000); // Simulated delay of 1 second
});
};
const entities = ['entity1', 'entity2', 'entity3', 'entity4', 'entity5', 'entity6'];
// Chunk the entities into arrays of size 3
const chunkedEntities = chunkArray(entities, 3);
// Map over the chunks and perform asynchronous operations using Promise.all
const allPromises = chunkedEntities.map(chunk => {
const promiseQuery = chunk.map(entity => scrapingProcessForAnEntity(entity));
return Promise.all(promiseQuery);
});
// Flatten the results if needed (if you want a flat array of fulfilled promises)
const fullfilledPromises = Promise.all(allPromises.flat());
fullfilledPromises.then(results => {
console.log('All API calls completed:', results);
}).catch(error => {
console.error('Error:', error);
});
这里 allEntities 将具有 promise 数组,而 Promise.all 方法将按时执行它
评论
let entities = [] // all entities
let batch = []; // holding current batch of promises
let buffer = []; // holding all the scraped data
function scrapingProcessForBatch(batch) {
return Promise.all(batch.map(entity => scrapingProcessForAnEntity(entity)))
}
async function addBatch() {
batch = [];
if (!entities.length) return; // if no more entities, exit
for (let i=0; i<3; i++) {
let entity = entities.pop();
if (entity) batch.push(entity);
}
let data = await scrapingProcessForBatch(batch);
buffer.push(...data);
await addBatch();
}
async function run() {
await addBatch();
// all data in buffer array now
}
这可确保您始终一次处理 3 个实体,一旦一个批次完成,就会有一个新实体取而代之。您可以根据自己的喜好进行调整。
评论
Promise.all
将返回所有 3 个 promise 的数据数组。你需要在推动时传播它。buffer.push(...data)
检查以下解决方案是否按您的需要工作。在这里,我保持队列不受限制,但控制执行阈值。tasks
let tasks = []
let buffer = []
let runningTasks = 0
async function run(){
for(entity in entities){
addPromise(() => scrapingProcessForAnEntity(entity))
}
processQueue();
}
async function addPromise(prom){
tasks.push(prom)
}
async function processQueue() {
// [1] initally runningTasks = 0
if (runningTasks === 3 || tasks.length === 0) {
return;
}
// [2] take 1st task and remove it from array
const taskPromise = tasks[0]();
tasks.shift();
// [3] executes 1st task, won't wait
taskPromise.then((data) => {
buffer.push(data);
runningTasks -= 1;
// Check again if there is anything to execute
processQueue();
});
// [4] If running tasks is less than 3 and if tasks has more, execute again
// If runningTasks already 3, Then no need to execute now, It will be handled in then() above
if ( runningTasks < 3 && tasks.length > 0) {
processQueue();
}
}
评论
tasks.slice(0, 1);
什么也不做。你是说吗?shift()
splice
这比分块更有效。这个想法是计算当前正在执行的承诺,并仅在该计数低于限制时启动下一个承诺。
class LimitedResourceExecutor {
#size
#queue
#running
constructor(size=1) {
this.#size = size
this.#queue = []
this.#running = 0
}
get size() { return this.#size }
// enqueue a task, then check if it should be run
call(obj, func, ...args) {
return new Promise((resolve, reject) => {
this.#queue.push([resolve, reject, obj, func, args])
this.#runQueue()
})
}
// run a task if it can be run, check again afterwards
async #runQueue() {
if (this.#queue.length && this.#running < this.size) {
this.#running++
const [resolve, reject, obj, func, args] = this.#queue.shift()
try {
const result = await func.call(obj, ...args)
resolve(result)
} catch (err) {
reject(err)
} finally {
this.#running--
this.#runQueue()
}
}
}
}
// From https://stackoverflow.com/a/39914235/240443
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
// just a little test function
let id = 0
let start = new Date().getTime()
async function sleepAndLog(ms) {
const thisId = ++id
console.log(new Date() - start, thisId, "starting", ms)
await sleep(ms)
console.log(new Date() - start, thisId, "finished", ms)
return thisId
}
// only two tasks in parallel
const lre = new LimitedResourceExecutor(2)
const timings = [4000, 1000, 1000, 2000, 3000, 1000]
// construct a promise list
const promises = timings.map(ms => lre.call(null, sleepAndLog, ms))
// Promise.all still works, results coming in the correct order
Promise.all(promises).then(results => {
console.log("DONE")
console.log(JSON.stringify(results))
})
检查控制台日志,查看同时执行的任务是否不超过两个。另请注意,即使任务 1 很长,任务 3 也会在任务 2 完成后立即执行,而无需等待任务 1,这是分块执行所获得的行为。
评论
moveQueue
addProm
moveQueue
Promise.all