提问人:zaelcovsky 提问时间:11/9/2023 最后编辑:zaelcovsky 更新时间:11/9/2023 访问量:31
Python:在多线程脚本中设置队列的大小
Python: setting size of a queue in a multithreading script
问:
我有一个脚本,可以从 txt 文件中读取 URL 并将它们发送到服务器。此脚本具有用于发送 URL 的多线程。当从 txt 文件中读取 URL 时,会形成一个队列,然后线程使用该队列将 URL 发送到服务器。由于带有 URL 的文件可能非常大,因此我需要限制队列。当我指定时 - 脚本在队列形成后卡住。如果我不指定 - 一切正常。
我的错误在哪里?我不熟悉踩踏和队列,也许我需要使用 or ?如果是,在哪里实施它们?
client.py:client.py
maxsize
maxsize
queue.join()
queue.task_done()
import socket
from pathlib import Path
from threading import Thread
from queue import Queue
class UrlConverter:
def load(self, filename: str):
urls_file_path = str(Path(__file__).parent / Path(filename))
# queue = Queue() # works fine
queue = Queue(maxsize=10) # stuck
with open(urls_file_path, 'r', encoding="utf-8") as txt_file:
for line in txt_file:
line = line.strip()
queue.put(line)
return queue
class Client:
def execute(self, url: str):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 59999))
sock.send(url.encode())
data = sock.recv(1024)
sock.close()
print(f"{url}: {data.decode('utf-8')}\n")
except Exception as e:
print("Error sending request to", url, ":", str(e))
class ClientThreads:
def __init__(self, n: int):
self.n = n
def execute(self, queue):
threads = []
while not queue.empty():
while len(threads) < self.n:
client = Client()
t = Thread(target=client.execute, args=(queue.get(),))
t.start()
threads.append(t)
for thread in threads:
thread.join()
threads = []
def main():
url_converter = UrlConverter()
clients_threads = ClientThreads(10)
urls_queue = url_converter.load('urls.txt', 10)
clients_threads.execute(urls_queue)
if __name__ == "__main__":
main()
server.py(例如):
import socket
from urllib.error import URLError
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 59999))
sock.listen()
while True:
client, addr = sock.accept()
result = client.recv(1024)
try:
client.send(b'HI from server')
client.close()
except (ConnectionResetError, URLError) as e:
error = f"Error sending request to {result.decode('utf-8')} : {str(e)}"
client.send(error.encode())
client.close()
网址:.txt(例如):
https://en.wikipedia.org/wiki/Water
https://en.wikipedia.org/wiki/Hydrology
https://en.wikipedia.org/wiki/Aquifer
https://en.wikipedia.org/wiki/Drinking_water
https://en.wikipedia.org/wiki/Water_pollution
https://en.wikipedia.org/wiki/Water_treatment
https://en.wikipedia.org/wiki/Water_quality
https://en.wikipedia.org/wiki/Water_scarcity
https://en.wikipedia.org/wiki/Water_management
https://en.wikipedia.org/wiki/Water_cycle
https://en.wikipedia.org/wiki/Evaporation
https://en.wikipedia.org/wiki/Condensation
https://en.wikipedia.org/wiki/Precipitation
https://en.wikipedia.org/wiki/Runoff
https://en.wikipedia.org/wiki/Infiltration
https://en.wikipedia.org/wiki/Percolation
https://en.wikipedia.org/wiki/Groundwater
https://en.wikipedia.org/wiki/Surface_water
https://en.wikipedia.org/wiki/Water_table
答:
1赞
Andrej Kesely
11/9/2023
#1
对于这种工作,您不需要队列,我建议您使用模块中的 ThreadPool
。您可以设置线程池的进程数(在本例中为 10 个)。下面是一个简短的示例,您可以做到这一点:multiprocessing
from multiprocessing.pool import ThreadPool
from time import sleep
def work(url):
# send URL to server here, receive some response
# ...
sleep(1) # simulate work
response = f"Work processed."
return url, response
def main():
with ThreadPool(processes=10) as pool, open("urls.txt", "r") as f_url:
for url, response_from_server in pool.imap_unordered(
work, map(str.strip, f_url)
):
print(url, response_from_server)
if __name__ == "__main__":
main()
指纹:
https://en.wikipedia.org/wiki/Aquifer Work processed.
https://en.wikipedia.org/wiki/Water_quality Work processed.
https://en.wikipedia.org/wiki/Hydrology Work processed.
https://en.wikipedia.org/wiki/Water_pollution Work processed.
...
评论
0赞
zaelcovsky
11/9/2023
谢谢!但我需要队列的解决方案。我不明白 - 从 txt 文件中逐行读取?因为不可能一次加载整个 tx 文件 - 它可能非常大。main
1赞
Andrej Kesely
11/9/2023
@zaelcovsky 返回可以延迟迭代的对象(在本例中为 ) - 因此您不会将整个文件读入内存。每个线程一次从文件中接收一行。open()
f_url
上一个:排队和取消排队功能
下一个:无效参数已传递给 C 中的函数
评论
UrlConverter.load
queue.join()
queue.task_done()
join
task_done