提问人:Commodore 64 提问时间:11/2/2023 最后编辑:Commodore 64 更新时间:11/2/2023 访问量:52
使用需要很长时间的函数时停止代码停止
Stopping the code from halting when using a function that takes a long time
问:
我正在尝试使用连接到 Pico W 的 Adafruit IO 作为某些 Neopixels 的遥控器。读取该值大约需要 0.25 秒,这会破坏需要延迟的 LED 模式。不使用 time.sleep 进行此延迟不会改变任何事情 - 因为无论如何,读取提要都会停止代码。我尝试在循环中、函数中、使用 asyncio 的异步函数中读取它,但所有这些都没有解决问题 - 它们什么也没改变。我可能做错了 asyncio 部分,但我不知道也找不到好的例子。
有谁知道如何使读取发生得更快,或者在后台发生。
这是我在 asyncio 中使用的代码:
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
os.getenv('CIRCUITPY_WIFI_PASSWORD'))
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("connected to io")
statefeed = io.get_feed(<insert feed here>)
async def statefunc():
global state
state = io.receive_data(statefeed["key"])["value"] # This is the part that takes 0.25 seconds.
async def main():
asyncstate = asyncio.create_task(statefunc())
await asyncio.gather(asyncstate)
while True:
asyncio.run(main())
if state == "0":
# do a thing
if state == "1":
# do another thing```
答:
-1赞
Polyvios P
11/2/2023
#1
为了解决这个问题,你可以使用不同的方法,而不依赖于asyncio,因为微控制器不支持真正的并发线程。
下面是一种使用基于计时器的检查来降低轮询 Adafruit IO 的频率的方法,从而使您的 LED 模式平稳运行:
使用计时器进行轮询:您可以设置计时器,而不是在每次循环迭代中不断轮询 Adafruit IO。当计时器到期时,轮询 Adafruit IO、更新状态并重置计时器。
下面介绍如何构建代码:
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
import time
# Connect to WiFi
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
os.getenv('CIRCUITPY_WIFI_PASSWORD'))
# Adafruit IO credentials
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')
# Set up socket pool and HTTP session for Adafruit IO
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("Connected to Adafruit IO")
# Fetch the feed
statefeed = io.get_feed(<insert feed here>)
# Polling configuration
POLL_INTERVAL = 5 # Poll every 5 seconds
last_poll_time = time.monotonic()
state = None # Initial state
def update_state():
"""Update the state by polling Adafruit IO."""
global state
try:
state = io.receive_data(statefeed["key"])["value"]
except AdafruitIO_RequestError:
print("Error fetching state")
state = None
while True:
current_time = time.monotonic()
# If the poll interval has passed, update the state
if current_time - last_poll_time >= POLL_INTERVAL:
update_state()
last_poll_time = current_time
# Check the state and perform the corresponding action
if state == "0":
# do a thing (e.g., light up LEDs in a certain pattern)
pass
elif state == "1":
# do another thing (e.g., light up LEDs in a different pattern)
pass
# Run your LED patterns or other non-blocking code here
# ...
将“在此处插入 Feed”替换为相应的 Feed 名称或 ID。此代码将每 5 秒轮询一次 Adafruit IO 以更新状态。在这些轮询间隔之间,您可以执行 LED 模式或其他非阻塞任务,而不会被 IO 提取操作中断。根据需要调整POLL_INTERVAL。
评论
0赞
Commodore 64
11/2/2023
这对我来说并不真正有效,因为我不仅希望系统的控制更加即时,而且每 5 秒动画就会冻结一段可见的时间。这些 LED 用于圣诞树 - 在序列中突然停止看起来不太好。
评论
asyncstate
asyncstate
<task>
Promise
future
ready
get
asyncio.create_task()