从硬盘读取文件时,Python Asyncio 比线程慢得多

python asyncio is much slower than threads when reading files from hard disk

提问人:coin cheung 提问时间:9/14/2023 更新时间:9/14/2023 访问量:77

问:

大约有 1M 张图片,我需要读取它们并使用 python 将字节插入 redis 中。我有两个选择,第一个是使用线程池,第二个是使用 asyncio,因为这只是 IO 任务。但是,我发现线程池方法比异步方法快得多。一段示例代码如下:

import pickle
import os
import os.path as osp
import re
import redis
import asyncio

from multiprocessing.dummy import Pool


r = redis.StrictRedis(host='localhost', port=6379, db=1)



data_root = './datasets/images'

print('obtain name and paths')
paths_names = []
for root, dis, fls in os.walk(data_root):
    for fl in fls:
        if re.search('JPEG$', fl) is None: continue
        pth = osp.join(root, fl)
        name = re.sub('\.+/', '', pth)
        name = re.sub('/', '-', name)
        name = 'redis-' + name
        paths_names.append((pth, name))
print('num samples in total: ', len(paths_names))


### this is slower
print('insert into redis')
async def insert_one(path_name):
    pth, name = path_name
    if r.get(name): return
    with open(pth, 'rb') as fr:
        binary = fr.read()
    r.set(name, binary)

async def func(cid, n_co):
    num = len(paths_names)
    for i in range(cid, num, n_co):
        await insert_one(paths_names[i])

n_co = 256
loop = asyncio.get_event_loop()
tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)]
fut = asyncio.gather(*tasks)
loop.run_until_complete(fut)
loop.close()



### this is more than 10x faster
def insert_one(path_name):
    pth, name = path_name
    if r.get(name): return
    with open(pth, 'rb') as fr:
        binary = fr.read()
    r.set(name, binary)

def func(cid, n_co):
    num = len(paths_names)
    for i in range(cid, num, n_co):
        insert_one(paths_names[i])

with Pool(128) as pool:
    pool.map(func, paths_names)

在这里,我有两个问题让我很困惑:

  1. asyncio 方法有什么问题,它比线程方法慢?

  2. 是否鼓励向函数添加数百万个任务?喜欢这个:gather

    num_parallel = 1000000000
    tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)]
    await asyncio.gather(*tasks)

python io python-asyncio

评论


答:

1赞 lxop 9/14/2023 #1

实际上,您并没有将异步用于 I/O 操作,您仍在对磁盘读取和 Redis 读/写执行同步阻止调用。

尝试更改异步方法以使用异步 Redis 方法。特别是,使用 aioredis(曾经是一个单独的库,现在它是主 redis 库的一部分)。

import aioredis  # if you're using redis library version < 4.2.0rc1
# or
from redis import asyncio as aioredis  # if you're using redis >= 4.2.0rc1

aior = aioredis.from_url(
    "redis://localhost", encoding="utf-8", decode_responses=True
)

# ...
  if await aior.get(name): return
# ...
  await aoir.set(name, binary)

看看这种变化对你的速度差异有多大影响。如果您也想让文件 I/O 异步,请尝试 aiofile

from aiofile import async_open

# ...
  async with async_open(pth, 'rb') as fr:
    binary = await fr.read()
# ...

评论

0赞 coin cheung 9/14/2023
在这种情况下,是否可以对 IO 使用异步?让它工作的正确方法是什么?
2赞 lxop 9/14/2023
我上面展示的内容将对 redis 所需的网络 I/O 使用异步,最好在尝试更改文件 I/O 之前先尝试该更改。如果您也想异步执行文件 I/O,请查看 aiofile
0赞 coin cheung 9/14/2023
第二个问题是什么?添加数百万个任务是一个好的重救星吗?gather
2赞 Chris 9/14/2023 #2

LXOP 关于您的第一个问题是正确的。

关于你的第二个问题。添加 100 万个任务来收集是可以的。但是,如果它们同时运行并打开连接,则会出现问题。这通常会导致更多的拥塞,因为服务器/文件系统只允许有限数量的连接。此外,它还可能导致连接超时或完全断开。

一种解决方案是在协程中使用信号量

async def insert_one(.., semaphore):
    with semaphore:
       .. # logic from lxop

semaphore= asyncio.Semaphore(10)
tasks = [loop.create_task(insert_one(.., semaphore)) for .. in ..]

此信号量将确保最多 10 个插件同时运行。您可以使用这个数字,看看什么最有效,10-1000 之间的任何数字都可以使用,具体取决于您的服务器设置和连接。