提问人:Stefano Borini 提问时间:8/6/2009 最后编辑:Benyamin JafariStefano Borini 更新时间:7/16/2023 访问量:324951
Python 中的异步方法调用?
Asynchronous method call in Python?
问:
我想知道 Python 中是否有任何用于异步方法调用的库。如果你能做这样的事情,那就太好了
@async
def longComputation():
<code>
token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
doSomethingElse()
if token.finished():
result = token.result()
或者异步调用非异步例程
def longComputation()
<code>
token = asynccall(longComputation())
如果能有一个更精细的策略,就像在语言核心中一样,那就太好了。这考虑过吗?
答:
它不在语言核心中,但一个非常成熟的库可以做你想做的事,那就是 Twisted。它引入了 Deferred 对象,您可以将回调或错误处理程序(“errbacks”)附加到该对象。延迟基本上是一个“承诺”,即一个函数最终会有一个结果。
评论
像这样:
import threading
thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done
有关详细信息,请参阅 https://docs.python.org/library/threading.html 上的文档。
评论
有什么理由不使用线程吗?您可以使用该类。
使用 .该函数可以线程并检索结果。如果可以,请重写 和 函数以调用构造函数中指定的函数,并将值保存到类的实例中。threading
finished()
isAlive()
result()
join()
run()
__init__
评论
您可以使用 Python 2.6 中添加的多处理模块。您可以使用进程池,然后通过以下方式异步获取结果:
apply_async(func[, args[, kwds[, callback]]])
例如:
import time
from multiprocessing import Pool
def postprocess(result):
print("finished: %s" % result)
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1) # Start a worker processes.
result = pool.apply_async(f, [10], callback=postprocess) # Evaluate "f(10)" asynchronously calling callback when finished.
print("waiting...")
time.sleep(1)
这只是一种选择。该模块提供了许多工具来实现您想要的。此外,从中制作装饰器也非常容易。
评论
from multiprocessing.dummy import Pool
我的解决方案是:
import threading
class TimeoutError(RuntimeError):
pass
class AsyncCall(object):
def __init__(self, fnc, callback = None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
self.Thread.start()
return self
def wait(self, timeout = None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise TimeoutError()
else:
return self.Result
def run(self, *args, **kwargs):
self.Result = self.Callable(*args, **kwargs)
if self.Callback:
self.Callback(self.Result)
class AsyncMethod(object):
def __init__(self, fnc, callback=None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)
def Async(fnc = None, callback = None):
if fnc == None:
def AddAsyncCallback(fnc):
return AsyncMethod(fnc, callback)
return AddAsyncCallback
else:
return AsyncMethod(fnc, callback)
并完全按照要求工作:
@Async
def fnc():
pass
评论
async
你可以实现一个装饰器来使你的函数异步,尽管这有点棘手。该模块充满了小怪癖和看似任意的限制——不过,更有理由将其封装在一个友好的界面后面。multiprocessing
from inspect import getmodule
from multiprocessing import Pool
def async(decorated):
r'''Wraps a top-level function around an asynchronous dispatcher.
when the decorated function is called, a task is submitted to a
process pool, and a future object is returned, providing access to an
eventual return value.
The future object has a blocking get() method to access the task
result: it will return immediately if the job is already done, or block
until it completes.
This decorator won't work on methods, due to limitations in Python's
pickling machinery (in principle methods could be made pickleable, but
good luck on that).
'''
# Keeps the original function visible from the module global namespace,
# under a name consistent to its __name__ attribute. This is necessary for
# the multiprocessing pickling machinery to work properly.
module = getmodule(decorated)
decorated.__name__ += '_original'
setattr(module, decorated.__name__, decorated)
def send(*args, **opts):
return async.pool.apply_async(decorated, args, opts)
return send
下面的代码说明了装饰器的用法:
@async
def printsum(uid, values):
summed = 0
for value in values:
summed += value
print("Worker %i: sum value is %i" % (uid, summed))
return (uid, summed)
if __name__ == '__main__':
from random import sample
# The process pool must be created inside __main__.
async.pool = Pool(4)
p = range(0, 1000)
results = []
for i in range(4):
result = printsum(i, sample(p, 100))
results.append(result)
for result in results:
print("Worker %i: sum value is %i" % result.get())
在实际案例中,我会在装饰器上进行更多阐述,提供一些方法来关闭它进行调试(同时保持未来的接口),或者可能是处理异常的工具;但我认为这很好地证明了这个原则。
评论
只
import threading, time
def f():
print "f started"
time.sleep(3)
print "f finished"
threading.Thread(target=f).start()
您可以使用 eventlet。它允许您编写看似同步的代码,但让它在网络上异步运行。
下面是一个超小爬虫的示例:
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
import eventlet
from eventlet.green import urllib2
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print "got body", len(body)
像这样的东西对我有用,然后你可以调用该函数,它会将自己调度到一个新线程上。
from thread import start_new_thread
def dowork(asynchronous=True):
if asynchronous:
args = (False)
start_new_thread(dowork,args) #Call itself on a new thread.
else:
while True:
#do something...
time.sleep(60) #sleep for a minute
return
从 Python 3.5 开始,您可以将增强的生成器用于异步函数。
import asyncio
import datetime
增强的生成器语法:
@asyncio.coroutine
def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
新的 async/await
语法:
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
评论
您可以使用 concurrent.futures(在 Python 3.2 中添加)。
import time
from concurrent.futures import ThreadPoolExecutor
def long_computation(duration):
for x in range(0, duration):
print(x)
time.sleep(1)
return duration * 2
print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(long_computation, 5)
while not future.done():
print('waiting...')
time.sleep(0.5)
print(future.result())
print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))
print('waiting for callback')
executor.shutdown(False) # non-blocking
print('shutdown invoked')
评论
您可以使用 process。如果你想永远运行它,请在你的函数中使用while(如网络):
from multiprocessing import Process
def foo():
while 1:
# Do something
p = Process(target = foo)
p.start()
如果您只想运行一次,请这样做:
from multiprocessing import Process
def foo():
# Do something
p = Process(target = foo)
p.start()
p.join()
2021 年异步调用的原生 Python 方式,Python 3.9 也适用于 Jupyter / Ipython 内核
Camabeh 的答案是自 Python 3.3 以来要走的路。
async def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
这将在 Jupyter Notebook / Jupyter Lab 中起作用,但会引发错误:
RuntimeError: This event loop is already running
由于 Ipython 使用事件循环,我们需要一种称为嵌套异步循环的东西,它尚未在 Python 中实现。幸运的是,有nest_asyncio来解决这个问题。您需要做的就是:
!pip install nest_asyncio # use ! within Jupyter Notebook, else pip install in shell
import nest_asyncio
nest_asyncio.apply()
(基于此线程)
只有当你调用它时才会抛出另一个错误,因为它可能指的是 Ipython 的主循环。loop.close()
RuntimeError: Cannot close a running event loop
一旦有人回答了这个 github 问题,我就会更新这个答案。
Python 3.7 及更高版本中较新的运行方法是使用而不是创建、调用和关闭它:asyncio
asyncio.run()
loop
loop.run_until_complete()
import asyncio
import datetime
async def display_date(delay):
loop = asyncio.get_running_loop()
end_time = loop.time() + delay
while True:
print("Blocking...", datetime.datetime.now())
await asyncio.sleep(1)
if loop.time() > end_time:
print("Done.")
break
asyncio.run(display_date(5))
评论
async
await