提问人:Red 提问时间:11/2/2023 最后编辑:Red 更新时间:11/7/2023 访问量:90
Python 与 concurrent.futures.ProcessPoolExecutor(或多处理)的并行性。Pool) 和 AsyncIO
Python parallelism with concurrent.futures.ProcessPoolExecutor (or multiprocessing.Pool) and AsyncIO
问:
目前,我正在开发一个项目,该项目可以监听每个 API 请求,对其进行检查并将其发送回去。可悲的是,API 设法使我的单线程 AsyncIO 和多线程尝试过载,但 CPU 从未达到负载的 20%。
所以我的想法是使用多处理并完全加载 (70-90%) CPU。最好的方法(我认为)是使用 or 因为我可以简单地设置和处理 CPU 可能处理的尽可能多的请求。但我失败了。concurrent.futures.ProcessPoolExecutor
multiprocessing.Pool
max_workers
max_tasks_per_child
asyncio
我目前的问题是在我的自定义队列系统中创建进程。确切地说,我不完全知道如何创建一个隔离的进程,该进程将:
- 生成循环
asyncio
- 运行具有 3 秒超时的异步函数
asyncio.TaskGroup()
- 运行所有任务
- 返回数据
基本上,我想创建所需检查函数的副本(队列系统提供它们),在隔离进程中运行它们并返回数据。
方法如下:add_queue
async def add_queue(self, function: str, id: int | None, **kwargs) -> None:
functions: tuple | None = self.filter.get(function)
process_executor: syscon.concurrent.futures.ProcessPoolExecutor = self.CONNECTOR.process_executor
if not functions:
self.logger.log(level="DEBUG", message=f"@Queue | No match found for {function}.")
elif guild_id is None:
self.logger.log(level="DEBUG", message=f"@Queue | No ID found in {function} request.")
elif process_executor is None:
self.logger.log(level="ERROR", message="ProcessPoolExecutor does NOT exist!")
else:
try:
self.logger.log(level="DEBUG", message=f"@Queue | Submitting task to ProcessPoolExecutor.")
self.process_executor.submit(self.process_event_runner, functions=functions, id=id, **kwargs)
except Exception as error:
self.logger.log(level="ERROR", message=f"@Queue | {type(error).__name__}: {error}")
以及被调用的方法:
def process_event_runner(self, functions: tuple, id: int, **inputedargs) -> None:
self.logger.log(level="DEBUG", message=f"@Queue.process_event_runner | Runner executing.")
event_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
try:
event_loop.run_until_complete(asyncio.wait_for(async_task_worker(functions=functions, loop=event_loop, id=id, **inputedargs), timeout=3))
except asyncio.TimeoutError:
self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task timed out. Terminating.")
except asyncio.CancelledError:
self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task cancelled. Terminating.")
except ExceptionGroup as groupError:
self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | Exception Group Error: {', '.join([f'SubE {num}: {exception}' for num, exception in enumerate(groupError.exceptions, 1)])}")
except Exception as error:
self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | {type(error).__name__}: {error}")
async def async_task_worker(functions, loop: asyncio.AbstractEventLoop, id: int, **inputedargs):
asyncio.set_event_loop(loop)
async with asyncio.TaskGroup() as group:
tasks: list[asyncio.Task] = [group.create_task(func(**inputedargs)) for func in functions]
return True
提供的这两种方法都位于名为 的类中。Queue
运行此操作后,什么也没发生。发现没有执行提供的功能,而是执行了只有整个系统启动的功能。队列系统位于 src.core.transmitters.queue
中。.submit()
main.py
如何创建一个隔离的进程,该进程创建运行器函数()和变量元组中定义的所有其他函数的副本并运行它们?并且还有办法限制和使用?甚至可能吗?process_event_runner
functions
workers
asyncio
有关整个系统如何工作的任何问题,请随时在评论中提问,我会尽力回答。
正如Michael在评论中提出的,这是我在尝试导入指定函数(process_event_runner)时得到的循环导入错误的完整回溯:main.py
if __name__ == "__main__"
Traceback (most recent call last):
File "d:\Projects\RedSecurity\main.py", line 1, in <module>
from src.core.transmitters.queue import process_event_runner
File "d:\Projects\RedSecurity\src\core\transmitters\queue.py", line 3, in <module>
import src.sysConnector as syscon
File "d:\Projects\RedSecurity\src\sysConnector.py", line 9, in <module>
from src.core.transmitters.queue import Queue
ImportError: cannot import name 'Queue' from partially initialized module 'src.core.transmitters.queue' (most likely due to a circular import) (d:\Projects\RedSecurity\src\core\transmitters\queue.py)
加载程序位于规则之后,并且 start 方法设置为 。喜欢这个:if __main__...
spawn
from src.core.transmitters.queue import process_event_runner
if __name__ == "__main__":
import discord, sys, aiohttp, os, json, platform, glob, concurrent.futures, multiprocessing
sys.dont_write_bytecode = True
from discord.ext import commands
import src.system.colors as colors
import src.sysConnector as syscon
import src.setup as setup
class Bot(commands.AutoShardedBot):
#enabling all intents and setting up command_prefix
#initializing bot (intents, shards, prefix..)
def __init__(self, connector: syscon.SysConnector, setup: setup.Setup) -> None:
super().__init__(command_prefix="> ", intents=discord.Intents.all(), shard_count=1, help_command=None, max_messages=10000)
...
答: 暂无答案
评论
main.py
if __name__ == "__main__"
circular import error