Python:在多线程脚本中设置队列的大小

Python: setting size of a queue in a multithreading script

提问人:zaelcovsky 提问时间:11/9/2023 最后编辑:zaelcovsky 更新时间:11/9/2023 访问量:31

问:

我有一个脚本,可以从 txt 文件中读取 URL 并将它们发送到服务器。此脚本具有用于发送 URL 的多线程。当从 txt 文件中读取 URL 时,会形成一个队列,然后线程使用该队列将 URL 发送到服务器。由于带有 URL 的文件可能非常大,因此我需要限制队列。当我指定时 - 脚本在队列形成后卡住。如果我不指定 - 一切正常。 我的错误在哪里?我不熟悉踩踏和队列,也许我需要使用 or ?如果是,在哪里实施它们?
client.py:
client.pymaxsizemaxsizequeue.join()queue.task_done()

import socket
from pathlib import Path
from threading import Thread
from queue import Queue


class UrlConverter:
    def load(self, filename: str):
        urls_file_path = str(Path(__file__).parent / Path(filename))
        # queue = Queue() # works fine
        queue = Queue(maxsize=10) # stuck
        with open(urls_file_path, 'r', encoding="utf-8") as txt_file:
            for line in txt_file:
                line = line.strip()
                queue.put(line)
        return queue


class Client:
    def execute(self, url: str):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect(('localhost', 59999))
            sock.send(url.encode())
            data = sock.recv(1024)
            sock.close()
            print(f"{url}: {data.decode('utf-8')}\n")
        except Exception as e:
            print("Error sending request to", url, ":", str(e))


class ClientThreads:
    def __init__(self, n: int):
        self.n = n

    def execute(self, queue):
        threads = []
        while not queue.empty():
            while len(threads) < self.n:
                client = Client()
                t = Thread(target=client.execute, args=(queue.get(),))
                t.start()
                threads.append(t)
            for thread in threads:
                thread.join()
            threads = []


def main():
    url_converter = UrlConverter()
    clients_threads = ClientThreads(10)

    urls_queue = url_converter.load('urls.txt', 10)
    clients_threads.execute(urls_queue)


if __name__ == "__main__":
    main()

server.py(例如):

import socket
from urllib.error import URLError

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 59999))
sock.listen()

while True:
    client, addr = sock.accept()
    result = client.recv(1024)
    try:
        client.send(b'HI from server')
        client.close()
    except (ConnectionResetError, URLError) as e:
        error = f"Error sending request to {result.decode('utf-8')} : {str(e)}"
        client.send(error.encode())
        client.close()

网址:.txt(例如):

https://en.wikipedia.org/wiki/Water
https://en.wikipedia.org/wiki/Hydrology
https://en.wikipedia.org/wiki/Aquifer
https://en.wikipedia.org/wiki/Drinking_water
https://en.wikipedia.org/wiki/Water_pollution
https://en.wikipedia.org/wiki/Water_treatment
https://en.wikipedia.org/wiki/Water_quality
https://en.wikipedia.org/wiki/Water_scarcity
https://en.wikipedia.org/wiki/Water_management
https://en.wikipedia.org/wiki/Water_cycle
https://en.wikipedia.org/wiki/Evaporation
https://en.wikipedia.org/wiki/Condensation
https://en.wikipedia.org/wiki/Precipitation
https://en.wikipedia.org/wiki/Runoff
https://en.wikipedia.org/wiki/Infiltration
https://en.wikipedia.org/wiki/Percolation
https://en.wikipedia.org/wiki/Groundwater
https://en.wikipedia.org/wiki/Surface_water
https://en.wikipedia.org/wiki/Water_table
Python 多线程 套接字 队列

评论

1赞 Michael Butscher 11/9/2023
您尝试在启动将处理队列项的线程之前完全填充队列。可能的解决方案:首先创建一个空队列,启动线程(不应在空队列上终止),然后填充队列。如果没有更多项目可用于填充队列,则将一个特殊的停止项放入队列中(客户端线程数次),该线程可以识别并终止它们。UrlConverter.load
0赞 zaelcovsky 11/9/2023
@MichaelButscher谢谢!我需要在线程特殊停止后使用吗?在逐个线程完成队列项目的工作之后?queue.join()queue.task_done()
1赞 Michael Butscher 11/10/2023
如果程序应该在工作完成后终止(而不是在之后执行任何其他操作),并且没有必要。jointask_done

答:

1赞 Andrej Kesely 11/9/2023 #1

对于这种工作,您不需要队列,我建议您使用模块中的 ThreadPool。您可以设置线程池的进程数(在本例中为 10 个)。下面是一个简短的示例,您可以做到这一点:multiprocessing

from multiprocessing.pool import ThreadPool
from time import sleep


def work(url):
    # send URL to server here, receive some response
    # ...
    sleep(1)  # simulate work
    response = f"Work processed."
    return url, response


def main():
    with ThreadPool(processes=10) as pool, open("urls.txt", "r") as f_url:
        for url, response_from_server in pool.imap_unordered(
            work, map(str.strip, f_url)
        ):
            print(url, response_from_server)


if __name__ == "__main__":
    main()

指纹:

https://en.wikipedia.org/wiki/Aquifer Work processed.
https://en.wikipedia.org/wiki/Water_quality Work processed.
https://en.wikipedia.org/wiki/Hydrology Work processed.
https://en.wikipedia.org/wiki/Water_pollution Work processed.

...

评论

0赞 zaelcovsky 11/9/2023
谢谢!但我需要队列的解决方案。我不明白 - 从 txt 文件中逐行读取?因为不可能一次加载整个 tx 文件 - 它可能非常大。main
1赞 Andrej Kesely 11/9/2023
@zaelcovsky 返回可以延迟迭代的对象(在本例中为 ) - 因此您不会将整个文件读入内存。每个线程一次从文件中接收一行。open()f_url