提问人:nico 提问时间:10/6/2023 最后编辑:nico 更新时间:10/17/2023 访问量:62
Python 请求阻止发布 MQTT 消息
python request blocks publishing of mqtt messages
问:
我正在开发一个程序,可以从两个来源获取数据:
- 来自前端的传入 mqtt 消息 (paho) 和
- 来自 RestAPI(请求)的数据
传入的 mqtt 消息通过前端的用户输入触发对 RestAPI 的请求。
此外,它使用相同的连接/客户端通过 mqtt 以一秒的间隔向前端发送数据。
编辑:这种“推送方法”的另一种方法是,在启动/重新加载前端会话时从前端请求数据,如果仅更改数据,则从后端推送数据。但这并不能修复请求的阻止行为。
按照代码截取...
def main():
# Connect to MQTT broker
mqtt = mqttClient()
testClass = TestClassName(mqtt.client)
try:
# Start infinit loop to run mqtt client
mqtt.client.loop_start() # in this loop the callback for the triggering messages is running
# publish every 1 second
while True:
time.sleep(1)
testClass.uiPublish() # send data back to frontend
except KeyboardInterrupt:
logging.debug("Programm stopped by user")
mqtt.client.loop_stop()
mqtt.client.disconnect()
该类还包含 mqtt 消息和发布函数的回调。在其中一个回调中,将触发对 API 的请求函数。TestClassName
uiPublish()
发送的数据是类的一个属性,它由 API 请求更新。因此,这两个函数都必须有权访问此属性。TestClassName
# class TestClassName containes this function
def subscribeMqtt(self, topic):
self.mqttClient.subscribe(topic)
self.mqttClient.message_callback_add(topic, self.dotherequest)
一般来说,它可以工作,但有一个问题:对 API 的请求阻止了在 main() 函数的 while 循环中发送 mqtt 消息。该请求需要一些时间(最多 5 秒),这也会导致与 mqtt 代理断开连接/重新连接。
因此,前端在请求结束时已经过时,而且在代理上重新连接也不是一个好的解决方案。只要请求正在运行,前端就应该获取“旧”信息。在请求结束时,属性将更新,最多 1 秒后,数据将被发送到前端。dotherequest
我尝试了多种方法来处理这个问题,但我认为我有一些一般问题来理解 python 中多线程/异步背后的原理。我尝试设置发出请求的函数:
- 与 AIOHTTP 异步,
- 带螺纹的额外螺纹和
- 与 ThreadPool 异步
所有方法都不会改变发送 mqtt 消息的阻塞。我认为原因可能是因为最后我必须等待请求功能完成。我不会多次并行化同一个函数。我尝试并行化以完成程序的各个部分。
有什么想法/建议如何解决这个问题?
答:
所以,我想我理解了这个问题,并通过添加一个新线程找到了解决方案,如推荐的评论所示。
以我如何实现解决方案为例。
订阅 mqtt 主题并创建对函数的回调。self.dotherequest
# class TestClassName containes this function
def subscribeMqtt(self, topic):
self.mqttClient.subscribe(topic)
self.mqttClient.message_callback_add(topic, self.dotherequest)
在此函数中,将创建请求的新线程并启动。请求本身位于函数中 锁用于在写入数据的那一刻(在多个请求同时运行的情况下)保留对象的属性,也称为争用条件。self.functionWithRequest
def dotherequest(self, client, userdata, msg):
payload = json.loads(msg.payload.decode())
# creating a lock
lock = threading.Lock()
# New thread form request (IO)
t1 = threading.Thread(target=self.functionWithRequest, args=(payload, lock))
t1.start()
这是发生请求的函数。
def functionWithRequest(self, payload, lock):
headers = { "Content-Type": "application/json" }
data = {
"datapoint1": "data1",
"datapoint2": "data2",
"datapoint3": "data3"
}
response = requests.post(os.getenv('requestUrl'), json=data, headers=headers, verify='ca.pem', timeout=15)
if response.status_code == 200:
load = json.loads(response.text)
lock.acquire()
self.data = load['dataFromRequest']
lock.release()
else:
log.error('Request NOT successful. Error code {0}.'.format(response.status_code))
正如我所理解的 python 中线程和进程之间的区别,线程使用相同的 GIL,因此 GIL 一直在线程之间切换。最后,所有运行都在同一个 cpu 内核上运行,单个 python intepreter 正在运行。一个优点是所有线程都可以共享相同的变量/数据。
进程能够在多个 CPU 内核/多个 python 解释器上运行,这使得在进程完成后同步数据变得更加困难。优势是计算能力更强。
评论
time.sleep(0.001)