Python 多线程:在主线程中获取工作线程结果

Python multithreading: Obtain worker thread results in main thread

提问人:knightcool 提问时间:9/8/2020 更新时间:9/8/2020 访问量:1154

问:

在 Python 3.8 上,我为网络 I/O 任务实现了多线程,其中一堆工作线程从网络下载一些数据,对其进行处理并创建各自的结果列表。现在,当所有线程完成时,我希望主线程获取所有工作线程的结果列表并进一步处理。

在本次讨论中,我取消了网络 I/O 调用,并引入了一些虚拟代码。这是它的样子:

from queue import Queue
from threading import Thread
from random import randint as ri

class DownloadWorker(Thread):
    def __init__(self, queue, result_q):
        Thread.__init__(self)
        self.queue = queue
        self.result_q = result_q

    def run(self):
        while True:
            start_val = self.queue.get()
            try:
                # dummy code. Real code has network calls here
                thread_output = [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
                self.result_q.put(thread_output)
            finally:
                self.queue.task_done()

def main():
    queue = Queue()  # Communication between main thread and its workers
    result_q = Queue()  # Result queue so workers results can finally be pooled together by main thread

    # Create 2 worker threads
    for x in range(2):
        worker = DownloadWorker(queue, result_q)
        # Setting daemon to True will let the main thread exit even if worker threads block
        worker.daemon = True
        worker.start()

    start_values = [10, 100]  # pass start value to differentiate between thread outputs
    for start_val in start_values:
        queue.put(start_val)
    queue.join()

    # Both workers tasks done. Now let's pool the results(just printing here for simiplification..)
    while not result_q.empty():
        print(result_q.get())


if __name__ == '__main__':
    main()

到目前为止,这段代码运行良好,但我想知道是否有更好的方法在 Python 3.8 中使用多线程在主线程中汇集结果。我查看了这个旧线程,但是当我按照我的要求更改它时,它会引发错误(坦率地说,我不太了解那里的解决方案)。

感谢这方面的一些建议!

python-3.x python-多线程

评论


答:

1赞 Booboo 9/8/2020 #1

您已经发明了自己的线程池,该线程池已经由模块中的类提供:ThreadPoolExecutorconcurrent.futures

import concurrent.futures
from random import randint as ri


def worker(start_val):
    # dummy code. Real code has network calls here
    return [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]


def main():
    NUMBER_THREADS = 2
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS) as executor:
        start_values = [10, 100]  # pass start value to differentiate between thread outputs
        # method submit returns a Future instance, which encapsulates the asynchronous execution of a callable:
        futures = [executor.submit(worker, start_val) for start_val in start_values]
        for future in futures:
            result = future.result() # block until a result is returned
            print(result)
        # or you can do: results = executor.map(worker, start_values)

if __name__ == '__main__':
    main()

指纹:

[20, 14, 11]
[104, 104, 108]

评论

0赞 knightcool 9/9/2020
完善!“concurrent.futures”是我到现在为止所缺少的。相反,我正在研究多处理模块,对于像我这样的简单任务来说,它似乎提供了太多的功能,学习曲线非常陡峭。感谢您的建议!