提问人:aF. 提问时间:2/9/2010 最后编辑:starballaF. 更新时间:2/14/2023 访问量:89610
如何在Python中每60秒异步执行一个函数?
How to execute a function asynchronously every 60 seconds in Python?
问:
我想在 Python 上每 60 秒执行一次函数,但同时我不想被阻止。
如何异步执行此操作?
import threading
import time
def f():
print("hello world")
threading.Timer(3, f).start()
if __name__ == '__main__':
f()
time.sleep(20)
使用此代码,函数 f 在 20 秒的 time.time 内每 3 秒执行一次。 最后它给出了一个错误,我认为这是因为threading.timer尚未被取消。
我怎样才能取消它?
答:
为什么不创建一个专用线程,在其中放置一个简单的睡眠循环:
#!/usr/bin/env python
import time
while True:
# Your code here
time.sleep(60)
评论
您可以尝试线程。计时器类:http://docs.python.org/library/threading.html#timer-objects。
import threading
def f(f_stop):
# do something here ...
if not f_stop.is_set():
# call f() again in 60 seconds
threading.Timer(60, f, [f_stop]).start()
f_stop = threading.Event()
# start calling f now and every 60 sec thereafter
f(f_stop)
# stop the thread when needed
#f_stop.set()
评论
sys.exit(0)
f()
最简单的方法是创建一个每 60 秒运行一次的后台线程。一个简单的实现是:
import time
from threading import Thread
class BackgroundTimer(Thread):
def run(self):
while 1:
time.sleep(60)
# do something
# ... SNIP ...
# Inside your main thread
# ... SNIP ...
timer = BackgroundTimer()
timer.start()
显然,如果“做某事”需要很长时间,那么你需要在睡眠陈述中适应它。但是,60 秒是一个很好的近似值。
评论
__init__
time.time()
datetime.now()
这取决于你在此期间实际想做什么。线程是最通用和最不喜欢的方法;使用线程时,您应该注意线程的问题:并非所有(非 Python)代码都允许同时从多个线程访问,线程之间的通信应使用线程安全的数据结构完成,例如 ,您将无法从线程外部中断线程,并且在线程仍在运行时终止程序可能会导致解释器挂起或虚假回溯。Queue.Queue
通常有一种更简单的方法。如果您在 GUI 程序中执行此操作,请使用 GUI 库的计时器或事件功能。所有 GUI 都有这个。同样,如果你使用的是另一个事件系统,比如 Twisted 或其他服务器进程模型,你应该能够挂接到主事件循环中,让它定期调用你的函数。非线程方法确实会导致程序在函数挂起时被阻塞,但在函数调用之间不会被阻塞。
评论
我用谷歌搜索了一下,找到了 Python circuits Framework,它可以等待
特定事件。
电路方法包含触发和暂停直到响应功能,文档说:.callEvent(self, event, *channels)
将给定事件触发到指定通道并暂停执行 直到它被发送。此方法只能作为 参数添加到处理程序的顶层执行级别(例如 "").它有效地创建和返回 一个生成器,将由主循环调用,直到事件 已调度(参见 :func:)。
yield
yield self.callEvent(event)
circuits.core.handlers.handler
我希望你和我一样:)发现它有用
。
如果你想调用“on the clock”方法(例如,每小时一小时),你可以将以下想法与你选择的任何线程机制集成:
import time
def wait(n):
'''Wait until the next increment of n seconds'''
x = time.time()
time.sleep(n-(x%n))
print(time.asctime())
我认为重复运行线程的正确方法是下一个:
import threading
import time
def f():
print("hello world") # your code here
myThread.run()
if __name__ == '__main__':
myThread = threading.Timer(3, f) # timer is set to 3 seconds
myThread.start()
time.sleep(10) # it can be loop or other time consuming code here
if myThread.is_alive():
myThread.cancel()
使用此代码,函数 f 在 10 秒的 time.sleep(10) 内每 3 秒执行一次。最后,线程的运行被取消。
[删除了非异步版本]
要使用异步,您将使用 trio。我向所有询问异步 python 的人推荐 trio。特别是使用插座要容易得多。使用套接字,我有一个具有 1 个读取和 1 个写入函数的托儿所,写入函数从读取函数放置数据的位置写入数据;并等待发送。以下应用的工作原理是使用并打开一个托儿所,其中程序在循环中运行,每个循环之间都有一个循环,以便为应用的其余部分提供运行机会。这将在单个进程中运行程序,但您的计算机可以使用非异步方法处理 1500 个 TCP 连接,其中只有 255 个。trio.run(function,parameters)
await trio.sleep(60)
我还没有掌握取消语句,但我把它放在这意味着代码将等待 10 秒的时间比执行 60 秒的睡眠时间长,然后再进入下一个循环。move_on_after(70)
import trio
async def execTimer():
'''This function gets executed in a nursery simultaneously with the rest of the program'''
while True:
trio.move_on_after(70):
await trio.sleep(60)
print('60 Second Loop')
async def OneTime_OneMinute():
'''This functions gets run by trio.run to start the entire program'''
with trio.open_nursery() as nursery:
nursery.start_soon(execTimer)
nursery.start_soon(print,'do the rest of the program simultaneously')
def start():
'''You many have only one trio.run in the entire application'''
trio.run(OneTime_OneMinute)
if __name__ == '__main__':
start()
这将在托儿所中同时运行任意数量的功能。您可以将任何可取消语句用于程序其余部分继续运行的检查点。所有 trio 语句都是检查点,因此请经常使用它们。我没有测试这个应用程序;因此,如果有任何问题,请提出。 如您所见,trio是易于使用的功能的拥护者。它基于使用函数而不是对象,但如果您愿意,可以使用对象。 更多信息,请访问: [1]:https://trio.readthedocs.io/en/stable/reference-core.html
评论
cancel
setTimeout()
、setInterval()。
tkinter、twisted、gtk 的示例