异步。队列 - 从 txt 文件放置 url

asyncio.Queue - put urls from txt file

提问人:zaelcovsky 提问时间:11/7/2023 更新时间:11/7/2023 访问量:30

问:

我需要将 txt 文件中的 URL 传递给获取 URL 的脚本,并在 和 的帮助下做一些工作。所以我习惯于将 txt 文件中的 URL 放入队列中。细微差别是txt文件可能非常大,可能不适合内存,我尝试使用队列。但是当限制小于 URL 数量时 - 脚本被卡住了。所以我有两个问题:aiohttpasyncioasyncio.Queuemaxsize

  1. 我的错误在哪里?
  2. 也许有比在这种情况下使用更好的解决方案?asyncio.Queue

script.py:

import asyncio
from aiohttp import ClientSession


async def fetch(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    with open("urls.txt", 'r') as f:
        queue = asyncio.Queue(maxsize=5)
        for line in f:
            line = line.strip()
            await queue.put(line)
        # await queue.join() - tried this

    while not queue.empty():
        current_url = await queue.get()
        try:
            content = await fetch(current_url)
            print(f"{current_url}: {content[:15]}")
        except Exception as e:
            print(f"Error fetching {current_url}: {e}")
        finally:
            queue.task_done()

asyncio.run(main())

urls.txt:

https://www.google.com/
https://www.youtube.com/
https://www.facebook.com/
https://www.wikipedia.org/
https://www.amazon.com/
https://www.instagram.com/
https://www.twitter.com/
https://www.tumblr.com/
https://www.pinterest.com/
https://www.reddit.com/

在这个脚本中,我使用 txt 文件中的 URL 数量为 10。我试图添加 after 循环,但没有帮助。脚本仅在没有 >= 个 URL 或未指定时才有效。maxsize=5await queue.join()for line in f:await queue.join()maxsize

异步 队列 python-asyncio aiohttp

评论


答:

2赞 Andrej Kesely 11/7/2023 #1

您的程序结构错误。要使用队列,您需要一个并行从队列中读取的使用者(一个或多个)。

下面是一个简单的示例,如何创建 3 个工作线程,这些工作线程在将 URL 放入队列时卸载内容。最后,我们通过把 to to the queue 来结束 worker:None

import asyncio

from aiohttp import ClientSession


async def fetch(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()


async def consumer(queue):
    while True:
        current_url = await queue.get()
        try:
            if current_url is None:
                break

            content = await fetch(current_url)
            print(f"{current_url}: {content[:15].strip()}")
        except Exception as e:
            print(f"Error fetching {current_url}: {e}")
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue(maxsize=5)

    # for example, we create 3 workers consuming the queue
    workers = {asyncio.Task(consumer(queue)) for _ in range(3)}

    with open("urls.txt", "r") as f:
        for line in f:
            line = line.strip()
            await queue.put(line)

    # end the workers
    for _ in range(len(workers)):
        await queue.put(None)

    await queue.join()


asyncio.run(main())

指纹:

https://www.google.com/: <!doctype html>
https://www.facebook.com/: <!DOCTYPE html>
https://www.wikipedia.org/: <!DOCTYPE html>
https://www.amazon.com/: <!doctype html>
https://www.instagram.com/: <!DOCTYPE html>
https://www.tumblr.com/: <!doctype html
https://www.youtube.com/: <!DOCTYPE html>
https://www.twitter.com/: <!DOCTYPE html>
https://www.pinterest.com/: <!DOCTYPE html>
https://www.reddit.com/: <!DOCTYPE

评论

0赞 zaelcovsky 11/7/2023
谢谢!使用 explicit 来处理大文件是很好的做法吗?Queuemaxsize
1赞 Andrej Kesely 11/7/2023
@zaelcovsky 这完全取决于您的用例。如果你有很多RAM,你可以有更大的maxsize(或无限制)。这也取决于您的工作人员从队列中读取的速度等。