提问人:dawid 提问时间:11/15/2023 更新时间:11/15/2023 访问量:28
纯Python双向多线程通信的架构建议
architecture suggestion for pure python two way multi threaded communication
问:
我当前的环境非常典型,但似乎很难找到针对此方案的质量建议。没有性能要求,为了便于使用,不应安装其他软件(没有 rabbitmq / zeromq 等)
有一个协调器线程接收/发送数据并将其发送到所有组件: 0. “coordinator”事件处理程序(输入和输出)
有三个组件线程:
- “UI”烧瓶前端(输出)
- MQTTY Subscirber / Publisher (输入和输出)
- discord.py (asyncio) (输入和输出)
对于 (0<--1&2&3) 多进程队列,组件将事件放入队列以供协调器 事件 = 队列。队列()
#coordinator
while True:
try:
e = events.get()
logging.info(f'event {e}')
# .. do stuff .. #
finally:
events .task_done()
#any component
events.put_nowait('foobar')
对于 0-->1,使用多进程管理器,协调器设置要显示的 UI 值,这工作正常。
manager = Manager()
ui_inp = manager.dict()
Thread(target=endpoint_frontend.run,
args=(events, ui_inp),
name='frontend',
daemon=True).start()
#component
@app.route("/")
def status():
foo = ui_inp['foo'].copy()
return jsonify(foo)
对于 0-->2 使用多进程队列,协调器设置值,似乎没问题
mqtt_inp = queue.Queue()
Thread(target=endpoints_mqtt.run,
args=(events, mqtt_inp),
name='mqtt',
daemon=True).start()
#component:
while Connected:
try:
event = input_queue.get()
logging.warn(f"mqtt event {event}")
finally:
input_queue.task_done()
对于 0-->3 使用 asyncio 队列,这是可怕的 asyncio。队列不是线程安全的。
discord_input = asyncio.Queue()
Thread(target=endpoint_discord.run,
args=(events,discord_input),
name='bot',
daemon=True).start()
# component, discord class with one Cog
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@tasks.loop(seconds=0.1)
async def events(self):
while not self.bot.is_closed():
try:
event = await self.bot.input_queue.get()
finally:
self.bot.input_queue.task_done()
您能否推荐一种常见的干净的pythonic方法来处理这种协调器与组件的通信?
答: 暂无答案
评论