Python 请求阻止发布 MQTT 消息

python request blocks publishing of mqtt messages

提问人:nico 提问时间:10/6/2023 最后编辑:nico 更新时间:10/17/2023 访问量:62

问:

我正在开发一个程序,可以从两个来源获取数据:

  • 来自前端的传入 mqtt 消息 (paho) 和
  • 来自 RestAPI(请求)的数据

传入的 mqtt 消息通过前端的用户输入触发对 RestAPI 的请求。

此外,它使用相同的连接/客户端通过 mqtt 以一秒的间隔向前端发送数据。

编辑:这种“推送方法”的另一种方法是,在启动/重新加载前端会话时从前端请求数据,如果仅更改数据,则从后端推送数据。但这并不能修复请求的阻止行为。

按照代码截取...

def main():
    # Connect to MQTT broker
    mqtt = mqttClient()

    testClass = TestClassName(mqtt.client)

    try:
        # Start infinit loop to run mqtt client
        mqtt.client.loop_start() # in this loop the callback for the triggering messages is running
           
        # publish every 1 second
        while True:
            time.sleep(1)
            testClass.uiPublish() # send data back to frontend

    except KeyboardInterrupt:
        logging.debug("Programm stopped by user")
        mqtt.client.loop_stop()
        mqtt.client.disconnect()

该类还包含 mqtt 消息和发布函数的回调。在其中一个回调中,将触发对 API 的请求函数。TestClassNameuiPublish()

发送的数据是类的一个属性,它由 API 请求更新。因此,这两个函数都必须有权访问此属性。TestClassName

# class TestClassName containes this function
def subscribeMqtt(self, topic):
    self.mqttClient.subscribe(topic)
    self.mqttClient.message_callback_add(topic, self.dotherequest)

一般来说,它可以工作,但有一个问题:对 API 的请求阻止了在 main() 函数的 while 循环中发送 mqtt 消息。该请求需要一些时间(最多 5 秒),这也会导致与 mqtt 代理断开连接/重新连接。 因此,前端在请求结束时已经过时,而且在代理上重新连接也不是一个好的解决方案。只要请求正在运行,前端就应该获取“旧”信息。在请求结束时,属性将更新,最多 1 秒后,数据将被发送到前端。dotherequest

我尝试了多种方法来处理这个问题,但我认为我有一些一般问题来理解 python 中多线程/异步背后的原理。我尝试设置发出请求的函数:

  • 与 AIOHTTP 异步,
  • 带螺纹的额外螺纹和
  • 与 ThreadPool 异步

所有方法都不会改变发送 mqtt 消息的阻塞。我认为原因可能是因为最后我必须等待请求功能完成。我不会多次并行化同一个函数。我尝试并行化以完成程序的各个部分。

有什么想法/建议如何解决这个问题?

python MQTT 非阻塞

评论

0赞 Ulrich Eckhardt 10/6/2023
我看到的第一个问题是数据共享,这总是有问题的。如何同步访问权限?其次,需要长达 5 秒的处理,是以某种方式异步完成的吗?如果不是,它可能会保留 GIL,这会阻止所有其他处理。如果它在单独的线程中运行,情况也是如此,它不会自动释放 GIL。为此,您需要一个单独的进程(多线程与多处理)。您可以尝试将调用添加到内部紧密循环中以找出是否有帮助吗?time.sleep(0.001)
0赞 hardillb 10/6/2023
不能在消息回调中做阻塞任务
0赞 nico 10/6/2023
@UlrichEckhardt 共享数据问题:发送到前端的数据包由请求正在运行的函数更新。因此,基本思想是,前端在函数更新后立即获得更新的数据包。这种“推送方法”使得如果前端的会话重新启动/实现(前端在 REACT 中运行),前端就没有必要要求提供最新的数据包。如何从回调中“断开”阻止请求任务?
0赞 James Jithin 10/6/2023
使用 websocket 推送到客户端/浏览器。
0赞 nico 10/6/2023
@hardillb 我找到了这个线程,你对我的问题有一个更长的答案:stackoverflow.com/questions/63504565/...。我开始研究回调的线程。

答:

0赞 nico 10/17/2023 #1

所以,我想我理解了这个问题,并通过添加一个新线程找到了解决方案,如推荐的评论所示。

以我如何实现解决方案为例。

订阅 mqtt 主题并创建对函数的回调。self.dotherequest

# class TestClassName containes this function
def subscribeMqtt(self, topic):
    self.mqttClient.subscribe(topic)
    self.mqttClient.message_callback_add(topic, self.dotherequest)

在此函数中,将创建请求的新线程并启动。请求本身位于函数中 锁用于在写入数据的那一刻(在多个请求同时运行的情况下)保留对象的属性,也称为争用条件。self.functionWithRequest

def dotherequest(self, client, userdata, msg):
        payload = json.loads(msg.payload.decode())   
        # creating a lock 
        lock = threading.Lock() 
        # New thread form request (IO)
        t1 = threading.Thread(target=self.functionWithRequest, args=(payload, lock)) 
        t1.start()

这是发生请求的函数。

def functionWithRequest(self, payload, lock):   
    headers = { "Content-Type": "application/json" }
    data = {
        "datapoint1": "data1",
        "datapoint2": "data2",
        "datapoint3": "data3"
        }
    
    response = requests.post(os.getenv('requestUrl'), json=data, headers=headers, verify='ca.pem', timeout=15)

    if response.status_code == 200:
        load = json.loads(response.text)
            lock.acquire() 
            self.data = load['dataFromRequest']
            lock.release()
    else:
        log.error('Request NOT successful. Error code {0}.'.format(response.status_code))

正如我所理解的 python 中线程和进程之间的区别,线程使用相同的 GIL,因此 GIL 一直在线程之间切换。最后,所有运行都在同一个 cpu 内核上运行,单个 python intepreter 正在运行。一个优点是所有线程都可以共享相同的变量/数据。

进程能够在多个 CPU 内核/多个 python 解释器上运行,这使得在进程完成后同步数据变得更加困难。优势是计算能力更强。