Python 与 concurrent.futures.ProcessPoolExecutor(或多处理)的并行性。Pool) 和 AsyncIO

Python parallelism with concurrent.futures.ProcessPoolExecutor (or multiprocessing.Pool) and AsyncIO

提问人:Red 提问时间:11/2/2023 最后编辑:Red 更新时间:11/7/2023 访问量:90

问:

目前,我正在开发一个项目,该项目可以监听每个 API 请求,对其进行检查并将其发送回去。可悲的是,API 设法使我的单线程 AsyncIO 和多线程尝试过载,但 CPU 从未达到负载的 20%。

所以我的想法是使用多处理并完全加载 (70-90%) CPU。最好的方法(我认为)是使用 or 因为我可以简单地设置和处理 CPU 可能处理的尽可能多的请求。但我失败了。concurrent.futures.ProcessPoolExecutormultiprocessing.Poolmax_workersmax_tasks_per_childasyncio

我目前的问题是在我的自定义队列系统中创建进程。确切地说,我不完全知道如何创建一个隔离的进程,该进程将:

  • 生成循环asyncio
  • 运行具有 3 秒超时的异步函数asyncio.TaskGroup()
  • 运行所有任务
  • 返回数据

基本上,我想创建所需检查函数的副本(队列系统提供它们),在隔离进程中运行它们并返回数据。

方法如下:add_queue

async def add_queue(self, function: str, id: int | None, **kwargs) -> None:
    functions: tuple | None = self.filter.get(function)
    process_executor: syscon.concurrent.futures.ProcessPoolExecutor = self.CONNECTOR.process_executor

    if not functions:
        self.logger.log(level="DEBUG", message=f"@Queue | No match found for {function}.")
    elif guild_id is None:
        self.logger.log(level="DEBUG", message=f"@Queue | No ID found in {function} request.")
    elif process_executor is None:
        self.logger.log(level="ERROR", message="ProcessPoolExecutor does NOT exist!")
    else:
        try:
            self.logger.log(level="DEBUG", message=f"@Queue | Submitting task to ProcessPoolExecutor.")
            self.process_executor.submit(self.process_event_runner, functions=functions, id=id, **kwargs) 
        except Exception as error:
            self.logger.log(level="ERROR", message=f"@Queue | {type(error).__name__}: {error}")

以及被调用的方法:

def process_event_runner(self, functions: tuple, id: int, **inputedargs) -> None:
    self.logger.log(level="DEBUG", message=f"@Queue.process_event_runner | Runner executing.")
    event_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
    try:
        event_loop.run_until_complete(asyncio.wait_for(async_task_worker(functions=functions, loop=event_loop, id=id, **inputedargs), timeout=3))
    except asyncio.TimeoutError:
        self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task timed out. Terminating.")
    except asyncio.CancelledError:
        self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task cancelled. Terminating.")
    except ExceptionGroup as groupError:
        self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | Exception Group Error: {', '.join([f'SubE {num}: {exception}' for num, exception in enumerate(groupError.exceptions, 1)])}")
    except Exception as error:
        self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | {type(error).__name__}: {error}")
    
    async def async_task_worker(functions, loop: asyncio.AbstractEventLoop, id: int, **inputedargs):
        asyncio.set_event_loop(loop)     
        
        async with asyncio.TaskGroup() as group:
            tasks: list[asyncio.Task] = [group.create_task(func(**inputedargs)) for func in functions]
        return True

提供的这两种方法都位于名为 的类中。Queue

运行此操作后,什么也没发生。发现没有执行提供的功能,而是执行了只有整个系统启动的功能。队列系统位于 src.core.transmitters.queue 中。.submit()main.py

如何创建一个隔离的进程,该进程创建运行器函数()和变量元组中定义的所有其他函数的副本并运行它们?并且还有办法限制和使用?甚至可能吗?process_event_runnerfunctionsworkersasyncio

有关整个系统如何工作的任何问题,请随时在评论中提问,我会尽力回答。

正如Michael在评论中提出的,这是我在尝试导入指定函数(process_event_runner)时得到的循环导入错误的完整回溯:main.pyif __name__ == "__main__"

Traceback (most recent call last):
  File "d:\Projects\RedSecurity\main.py", line 1, in <module>
    from src.core.transmitters.queue import process_event_runner
  File "d:\Projects\RedSecurity\src\core\transmitters\queue.py", line 3, in <module>
    import src.sysConnector as syscon
  File "d:\Projects\RedSecurity\src\sysConnector.py", line 9, in <module>
    from src.core.transmitters.queue import Queue
ImportError: cannot import name 'Queue' from partially initialized module 'src.core.transmitters.queue' (most likely due to a circular import) (d:\Projects\RedSecurity\src\core\transmitters\queue.py)

加载程序位于规则之后,并且 start 方法设置为 。喜欢这个:if __main__...spawn

from src.core.transmitters.queue import process_event_runner

if __name__ == "__main__":
    import discord, sys, aiohttp, os, json, platform, glob, concurrent.futures, multiprocessing
    sys.dont_write_bytecode = True
    from discord.ext import commands
    import src.system.colors as colors
    import src.sysConnector as syscon
    import src.setup as setup
    
    class Bot(commands.AutoShardedBot):
        #enabling all intents and setting up command_prefix
        #initializing bot (intents, shards, prefix..)
        def __init__(self, connector: syscon.SysConnector, setup: setup.Setup) -> None:
            super().__init__(command_prefix="> ", intents=discord.Intents.all(), shard_count=1, help_command=None, max_messages=10000)
            ...
并行处理 python-asyncio 多处理 python-3.12

评论

0赞 Michael Butscher 11/2/2023
如果这是在 Windows(或 MacOS,我认为)上执行的,子进程的 start 方法是“spawn”。您是否阅读了有关此方法特殊要求的文档
0赞 Red 11/3/2023
我有。多处理和 ProcessPoolExecutor。Linux 和 Windows 系统之间存在差异,但老实说,找不到解决方案/理解文档。
0赞 Michael Butscher 11/3/2023
一件重要的事情是将活动代码(通常是除了类、函数和常量的导入和定义之外的所有内容)放在一个块中,以防止它在生成子进程时执行。main.pyif __name__ == "__main__"
0赞 Red 11/5/2023
我不可能导入它们,因为它会触发 .还有其他产卵方式吗?我目前正在使用 Windows,但我确实计划切换到 Linuxcircular import error
0赞 Michael Butscher 11/5/2023
导入到底有什么问题?将错误的完整回溯显示为格式正确的文本(格式化为代码)以及问题中的相关代码。

答: 暂无答案