提问人:coin cheung 提问时间:9/14/2023 更新时间:9/14/2023 访问量:77
从硬盘读取文件时,Python Asyncio 比线程慢得多
python asyncio is much slower than threads when reading files from hard disk
问:
大约有 1M 张图片,我需要读取它们并使用 python 将字节插入 redis 中。我有两个选择,第一个是使用线程池,第二个是使用 asyncio,因为这只是 IO 任务。但是,我发现线程池方法比异步方法快得多。一段示例代码如下:
import pickle
import os
import os.path as osp
import re
import redis
import asyncio
from multiprocessing.dummy import Pool
r = redis.StrictRedis(host='localhost', port=6379, db=1)
data_root = './datasets/images'
print('obtain name and paths')
paths_names = []
for root, dis, fls in os.walk(data_root):
for fl in fls:
if re.search('JPEG$', fl) is None: continue
pth = osp.join(root, fl)
name = re.sub('\.+/', '', pth)
name = re.sub('/', '-', name)
name = 'redis-' + name
paths_names.append((pth, name))
print('num samples in total: ', len(paths_names))
### this is slower
print('insert into redis')
async def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
async def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
await insert_one(paths_names[i])
n_co = 256
loop = asyncio.get_event_loop()
tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)]
fut = asyncio.gather(*tasks)
loop.run_until_complete(fut)
loop.close()
### this is more than 10x faster
def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
insert_one(paths_names[i])
with Pool(128) as pool:
pool.map(func, paths_names)
在这里,我有两个问题让我很困惑:
asyncio 方法有什么问题,它比线程方法慢?
是否鼓励向函数添加数百万个任务?喜欢这个:
gather
num_parallel = 1000000000
tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)]
await asyncio.gather(*tasks)
答:
1赞
lxop
9/14/2023
#1
实际上,您并没有将异步用于 I/O 操作,您仍在对磁盘读取和 Redis 读/写执行同步阻止调用。
尝试更改异步方法以使用异步 Redis 方法。特别是,使用 aioredis(曾经是一个单独的库,现在它是主 redis 库的一部分)。
import aioredis # if you're using redis library version < 4.2.0rc1
# or
from redis import asyncio as aioredis # if you're using redis >= 4.2.0rc1
aior = aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
# ...
if await aior.get(name): return
# ...
await aoir.set(name, binary)
看看这种变化对你的速度差异有多大影响。如果您也想让文件 I/O 异步,请尝试 aiofile:
from aiofile import async_open
# ...
async with async_open(pth, 'rb') as fr:
binary = await fr.read()
# ...
评论
0赞
coin cheung
9/14/2023
在这种情况下,是否可以对 IO 使用异步?让它工作的正确方法是什么?
2赞
lxop
9/14/2023
我上面展示的内容将对 redis 所需的网络 I/O 使用异步,最好在尝试更改文件 I/O 之前先尝试该更改。如果您也想异步执行文件 I/O,请查看 aiofile
0赞
coin cheung
9/14/2023
第二个问题是什么?添加数百万个任务是一个好的重救星吗?gather
2赞
Chris
9/14/2023
#2
LXOP 关于您的第一个问题是正确的。
关于你的第二个问题。添加 100 万个任务来收集是可以的。但是,如果它们同时运行并打开连接,则会出现问题。这通常会导致更多的拥塞,因为服务器/文件系统只允许有限数量的连接。此外,它还可能导致连接超时或完全断开。
一种解决方案是在协程中使用信号量。
async def insert_one(.., semaphore):
with semaphore:
.. # logic from lxop
semaphore= asyncio.Semaphore(10)
tasks = [loop.create_task(insert_one(.., semaphore)) for .. in ..]
此信号量将确保最多 10 个插件同时运行。您可以使用这个数字,看看什么最有效,10-1000 之间的任何数字都可以使用,具体取决于您的服务器设置和连接。
评论