js 中的异步有序队列执行器

Async ordered queue executer in js

提问人:anonymous 提问时间:11/16/2023 最后编辑:jabaaanonymous 更新时间:11/16/2023 访问量:44

问:

我有 NodeJS 应用程序,我在其中连接 WS 服务器,侦听新消息 - 在每条消息上,我都做aysnc工作。

问题: 这些工作需要时间 - 我想按照它们收到的顺序一次运行它们。

例如: message1 -> job(message) // 1000ms message2 -> job2(message2) // 5000ms messgae3 ->(在 job2 完成之前收到,但仅在 job2 完成后执行) -> job2(message3)

因此,我创建了这些类来模拟 NodeJS 应用程序的 beavior,并且在此代码中,作业仍未按顺序运行,并且看起来它没有等待前一个完成(不确定)

class AsyncQueue {
    constructor() {
        this.active = false
        this.queue = []
    }

    async doJobs() {
        if (this.active) {
            return
        }

        this.active = true
        while (this.queue.length > 0) {
            const fn = this.queue.shift() // take first
            await fn()
        }
        this.active = false
    }

    push(fn) {
        this.queue.push(fn) // push to last
        this.doJobs()
    }
}

class Utils {
    static randInt(min, max) {
        return Math.random() * (max - min) + min
    } 
    
    static sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms))
    }
}


class WsServer {
    static async onMessage(func) {
        while (true) {
            await Utils.sleep(1000)
            func()
        }
    }
}


async function job(name) {
    console.log(`Doing job ${name}`)
    await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
    console.log(`Done doing job ${name}`)
}

async function main() {
    const queue = new AsyncQueue()
    let counter = 0
    WsServer.onMessage(() => {
        counter += 1;
        queue.push(() => job(counter))
    })
}

main()

JavaScript 异步 async-await 队列

评论

0赞 Jake 11/16/2023
我试过了,但它仍然不同步地运行作业
0赞 jabaa 11/16/2023
似乎闭包是你的问题 stackoverflow.com/questions/50923825/let-vs-var-in-a-for-loop 计数器递增,函数使用递增的值,而不是推送函数时计数器的值

答:

0赞 jabaa 11/16/2023 #1

函数已按顺序处理。计数器让您感到困惑。计数器的混淆行为是由闭包引起的,如 for 循环中的 Let vs. var 中所述

该函数被推送到队列中,并可通过闭包访问计数器。推送函数后,计数器递增,函数打印递增的计数器。你可以用下面的代码看到它:

class AsyncQueue {
    constructor() {
        this.active = false
        this.queue = []
    }

    async doJobs() {
        if (this.active) {
            return
        }

        this.active = true
        while (this.queue.length > 0) {
            const fn = this.queue.shift() // take first
            await fn()
        }
        this.active = false
    }

    push(fn) {
        this.queue.push(fn) // push to last
        this.doJobs()
    }
}

class Utils {
    static randInt(min, max) {
        return Math.random() * (max - min) + min
    } 
    
    static sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms))
    }
}


class WsServer {
    static async onMessage(func) {
        while (true) {
            await Utils.sleep(1000)
            func()
        }
    }
}


async function job(name) {
    console.log(`Doing job ${name}`)
    await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
    console.log(`Done doing job ${name}`)
}

async function main() {
    const queue = new AsyncQueue()
    let counter = 0
    WsServer.onMessage(() => {
        counter += 1;
        queue.push(((counter) => () => job(counter))(counter))
    })
}

main()

在函数的处理过程中,我没有做任何改变。我只更改了函数访问计数器的方式。

问题的简单版本:

const list = [];

let i = 0;
list.push(() => console.log(i));
++i;
list.push(() => console.log(i));
++i;
list.push(() => console.log(i));
++i;
list.push(() => console.log(i));

list.forEach(f => f());

和简单的解决方案:

const list = [];

let i = 0;
list.push(((i) => () => console.log(i))(i));
++i;
list.push(((i) => () => console.log(i))(i));
++i;
list.push(((i) => () => console.log(i))(i));
++i;
list.push(((i) => () => console.log(i))(i));

list.forEach(f => f());

IMO 它不是重复的,因为它与循环无关,并且无法使用 instead 来解决。letvar

评论

0赞 Jaromanda X 11/16/2023
相反,您可以完全不需要更改任何其他代码:pqueue.push(((v) => () => job(v))(counter));
0赞 jabaa 11/16/2023
@JaromandaX 额外的阵列是为我准备的。我需要它来调试和理解问题。如果 OP 也想使用和调试它,它可能会很有用。
0赞 Jaromanda X 11/16/2023
很公平......还有更简单的const v = ++counter; queue.push(() => job(v));.onMessage
0赞 jabaa 11/16/2023
@JaromandaX 是的,常见的解决方案是将值作为参数传递给函数。你怎么做,是你的选择。我认为,在这种情况下这并不重要,因为 OP 用它来证明一个不存在的问题。函数按顺序处理。OP 使用计数器进行调试。如果 OP 想要修复此行为,我发现了几十个重复/类似的问题。