使用需要很长时间的函数时停止代码停止

Stopping the code from halting when using a function that takes a long time

提问人:Commodore 64 提问时间:11/2/2023 最后编辑:Commodore 64 更新时间:11/2/2023 访问量:52

问:

我正在尝试使用连接到 Pico W 的 Adafruit IO 作为某些 Neopixels 的遥控器。读取该值大约需要 0.25 秒,这会破坏需要延迟的 LED 模式。不使用 time.sleep 进行此延迟不会改变任何事情 - 因为无论如何,读取提要都会停止代码。我尝试在循环中、函数中、使用 asyncio 的异步函数中读取它,但所有这些都没有解决问题 - 它们什么也没改变。我可能做错了 asyncio 部分,但我不知道也找不到好的例子。

有谁知道如何使读取发生得更快,或者在后台发生。

这是我在 asyncio 中使用的代码:

import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError

wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
                   os.getenv('CIRCUITPY_WIFI_PASSWORD'))
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')

pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("connected to io")

statefeed = io.get_feed(<insert feed here>)

async def statefunc():
    global state
    state = io.receive_data(statefeed["key"])["value"] # This is the part that takes 0.25 seconds.
    
async def main():
    asyncstate = asyncio.create_task(statefunc())
    await asyncio.gather(asyncstate)

while True:
    asyncio.run(main())

    if state == "0":
        # do a thing

    if state == "1":
       # do another thing```
Python 多线程异 IoT Adafruit

评论

0赞 rioV8 11/2/2023
看起来您确实使用异步函数同步执行,查询它是否已经有值,如果没有,请再次尝试下一次循环迭代,然后启动新的异步asyncstate
0赞 Commodore 64 11/2/2023
@rioV8 通过查询,你的意思是得到 的值?如果是这样,它在打印时确实有一个值。().该函数确实完成了它的工作 - 它只是停止所有代码,而不仅仅是它的功能。另外,我在 RP2040 上这样做。(皮克W)。该董事会还能支持这一点吗?asyncstate<task>
0赞 rioV8 11/2/2023
使用 A 或 A 可以询问对象函数是否已被调用并且值是否可用,如果您随后立即获得该值而不会阻塞,则在结果中找到相同的功能Promisefuturereadygetasyncio.create_task()

答:

-1赞 Polyvios P 11/2/2023 #1

为了解决这个问题,你可以使用不同的方法,而不依赖于asyncio,因为微控制器不支持真正的并发线程。

下面是一种使用基于计时器的检查来降低轮询 Adafruit IO 的频率的方法,从而使您的 LED 模式平稳运行:

使用计时器进行轮询:您可以设置计时器,而不是在每次循环迭代中不断轮询 Adafruit IO。当计时器到期时,轮询 Adafruit IO、更新状态并重置计时器。

下面介绍如何构建代码:

import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
import time

# Connect to WiFi
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
                   os.getenv('CIRCUITPY_WIFI_PASSWORD'))

# Adafruit IO credentials
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')

# Set up socket pool and HTTP session for Adafruit IO
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)

print("Connected to Adafruit IO")

# Fetch the feed
statefeed = io.get_feed(<insert feed here>)

# Polling configuration
POLL_INTERVAL = 5  # Poll every 5 seconds
last_poll_time = time.monotonic()
state = None  # Initial state

def update_state():
    """Update the state by polling Adafruit IO."""
    global state
    try:
        state = io.receive_data(statefeed["key"])["value"]
    except AdafruitIO_RequestError:
        print("Error fetching state")
        state = None

while True:
    current_time = time.monotonic()
    
    # If the poll interval has passed, update the state
    if current_time - last_poll_time >= POLL_INTERVAL:
        update_state()
        last_poll_time = current_time

    # Check the state and perform the corresponding action
    if state == "0":
        # do a thing (e.g., light up LEDs in a certain pattern)
        pass
    elif state == "1":
        # do another thing (e.g., light up LEDs in a different pattern)
        pass

    # Run your LED patterns or other non-blocking code here
    # ...

“在此处插入 Feed”替换为相应的 Feed 名称或 ID。此代码将每 5 秒轮询一次 Adafruit IO 以更新状态。在这些轮询间隔之间,您可以执行 LED 模式或其他非阻塞任务,而不会被 IO 提取操作中断。根据需要调整POLL_INTERVAL。

评论

0赞 Commodore 64 11/2/2023
这对我来说并不真正有效,因为我不仅希望系统的控制更加即时,而且每 5 秒动画就会冻结一段可见的时间。这些 LED 用于圣诞树 - 在序列中突然停止看起来不太好。