提问人:zaelcovsky 提问时间:11/7/2023 更新时间:11/7/2023 访问量:30
异步。队列 - 从 txt 文件放置 url
asyncio.Queue - put urls from txt file
问:
我需要将 txt 文件中的 URL 传递给获取 URL 的脚本,并在 和 的帮助下做一些工作。所以我习惯于将 txt 文件中的 URL 放入队列中。细微差别是txt文件可能非常大,可能不适合内存,我尝试使用队列。但是当限制小于 URL 数量时 - 脚本被卡住了。所以我有两个问题:aiohttp
asyncio
asyncio.Queue
maxsize
- 我的错误在哪里?
- 也许有比在这种情况下使用更好的解决方案?
asyncio.Queue
script.py:
import asyncio
from aiohttp import ClientSession
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
with open("urls.txt", 'r') as f:
queue = asyncio.Queue(maxsize=5)
for line in f:
line = line.strip()
await queue.put(line)
# await queue.join() - tried this
while not queue.empty():
current_url = await queue.get()
try:
content = await fetch(current_url)
print(f"{current_url}: {content[:15]}")
except Exception as e:
print(f"Error fetching {current_url}: {e}")
finally:
queue.task_done()
asyncio.run(main())
urls.txt:
https://www.google.com/
https://www.youtube.com/
https://www.facebook.com/
https://www.wikipedia.org/
https://www.amazon.com/
https://www.instagram.com/
https://www.twitter.com/
https://www.tumblr.com/
https://www.pinterest.com/
https://www.reddit.com/
在这个脚本中,我使用 txt 文件中的 URL 数量为 10。我试图添加 after 循环,但没有帮助。脚本仅在没有 >= 个 URL 或未指定时才有效。maxsize=5
await queue.join()
for line in f:
await queue.join()
maxsize
答:
2赞
Andrej Kesely
11/7/2023
#1
您的程序结构错误。要使用队列,您需要一个并行从队列中读取的使用者(一个或多个)。
下面是一个简单的示例,如何创建 3 个工作线程,这些工作线程在将 URL 放入队列时卸载内容。最后,我们通过把 to to the queue 来结束 worker:None
import asyncio
from aiohttp import ClientSession
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def consumer(queue):
while True:
current_url = await queue.get()
try:
if current_url is None:
break
content = await fetch(current_url)
print(f"{current_url}: {content[:15].strip()}")
except Exception as e:
print(f"Error fetching {current_url}: {e}")
finally:
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
# for example, we create 3 workers consuming the queue
workers = {asyncio.Task(consumer(queue)) for _ in range(3)}
with open("urls.txt", "r") as f:
for line in f:
line = line.strip()
await queue.put(line)
# end the workers
for _ in range(len(workers)):
await queue.put(None)
await queue.join()
asyncio.run(main())
指纹:
https://www.google.com/: <!doctype html>
https://www.facebook.com/: <!DOCTYPE html>
https://www.wikipedia.org/: <!DOCTYPE html>
https://www.amazon.com/: <!doctype html>
https://www.instagram.com/: <!DOCTYPE html>
https://www.tumblr.com/: <!doctype html
https://www.youtube.com/: <!DOCTYPE html>
https://www.twitter.com/: <!DOCTYPE html>
https://www.pinterest.com/: <!DOCTYPE html>
https://www.reddit.com/: <!DOCTYPE
评论
0赞
zaelcovsky
11/7/2023
谢谢!使用 explicit 来处理大文件是很好的做法吗?Queue
maxsize
1赞
Andrej Kesely
11/7/2023
@zaelcovsky 这完全取决于您的用例。如果你有很多RAM,你可以有更大的maxsize(或无限制)。这也取决于您的工作人员从队列中读取的速度等。
评论