纯Python双向多线程通信的架构建议

architecture suggestion for pure python two way multi threaded communication

提问人:dawid 提问时间:11/15/2023 更新时间:11/15/2023 访问量:28

问:

我当前的环境非常典型,但似乎很难找到针对此方案的质量建议。没有性能要求,为了便于使用,不应安装其他软件(没有 rabbitmq / zeromq 等)

有一个协调器线程接收/发送数据并将其发送到所有组件: 0. “coordinator”事件处理程序(输入和输出)

有三个组件线程:

  1. “UI”烧瓶前端(输出)
  2. MQTTY Subscirber / Publisher (输入和输出)
  3. discord.py (asyncio) (输入和输出)

对于 (0<--1&2&3) 多进程队列,组件将事件放入队列以供协调器 事件 = 队列。队列()

#coordinator
    while True:
    try:
        e = events.get()
        logging.info(f'event {e}')
        # .. do stuff .. #
    finally:
        events .task_done()

#any component
events.put_nowait('foobar')

对于 0-->1,使用多进程管理器,协调器设置要显示的 UI 值,这工作正常。

manager = Manager()
ui_inp = manager.dict()

Thread(target=endpoint_frontend.run,
       args=(events, ui_inp),
       name='frontend',
       daemon=True).start()

#component
@app.route("/")
def status():
    foo = ui_inp['foo'].copy()
    return jsonify(foo)

对于 0-->2 使用多进程队列,协调器设置值,似乎没问题

mqtt_inp = queue.Queue()

Thread(target=endpoints_mqtt.run,
       args=(events, mqtt_inp),
       name='mqtt',
       daemon=True).start()

#component:
while Connected:
    try:
        event = input_queue.get()
        logging.warn(f"mqtt event {event}")
    finally:
        input_queue.task_done()

对于 0-->3 使用 asyncio 队列,这是可怕的 asyncio。队列不是线程安全的。

discord_input = asyncio.Queue()

Thread(target=endpoint_discord.run,
       args=(events,discord_input),
       name='bot',
       daemon=True).start()

# component, discord class with one Cog
class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot

    @tasks.loop(seconds=0.1)
    async def events(self):
        while not self.bot.is_closed():
            try:
                event = await self.bot.input_queue.get()
            finally:
                self.bot.input_queue.task_done()

您能否推荐一种常见的干净的pythonic方法来处理这种协调器与组件的通信?

python 多线程 烧瓶 discord.py python-asyncio

评论


答: 暂无答案