Python 中的异步方法调用?

Asynchronous method call in Python?

提问人:Stefano Borini 提问时间:8/6/2009 最后编辑:Benyamin JafariStefano Borini 更新时间:7/16/2023 访问量:324951

问:

我想知道 Python 中是否有任何用于异步方法调用的库。如果你能做这样的事情,那就太好了

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

或者异步调用非异步例程

def longComputation()
    <code>

token = asynccall(longComputation())

如果能有一个更精细的策略,就像在语言核心中一样,那就太好了。这考虑过吗?

异步 python-asyncio 协程

评论

1赞 jonrsharpe 12/28/2015
从 Python 3.4 开始:docs.python.org/3/library/asyncio.html(3.3 有一个向后移植,以及 3.5 的闪亮的新语法)。asyncawait
0赞 RajaRaviVarma 7/20/2016
没有回调机制,但你可以将结果聚合到字典中,它基于 Python 的多处理模块。我相信您可以向修饰的函数添加一个参数作为回调。github.com/alex-sherman/deco
0赞 Adarsh Madrecha 1/7/2019
开始使用。官方文档 - docs.python.org/3/library/concurrency.html

答:

31赞 Meredith L. Patterson 8/6/2009 #1

它不在语言核心中,但一个非常成熟的库可以做你想做的事,那就是 Twisted。它引入了 Deferred 对象,您可以将回调或错误处理程序(“errbacks”)附加到该对象。延迟基本上是一个“承诺”,即一个函数最终会有一个结果。

评论

1赞 Nicholas Riley 11/1/2009
特别是,请查看 twisted.internet.defer (twistedmatrix.com/documents/8.2.0/api/...)。
227赞 Drakosha 8/6/2009 #2

像这样:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

有关详细信息,请参阅 https://docs.python.org/library/threading.html 上的文档。

评论

1赞 kk1957 3/22/2013
是的,如果你只需要异步做事,为什么不直接使用 Thread?毕竟,螺纹比工艺轻
28赞 solublefish 5/27/2013
重要提示:由于“全局解释器锁定”,线程的标准实现 (CPython) 对计算绑定任务没有帮助。请参阅库 doc:link
3赞 Mgamerz 6/6/2014
使用 thread.join() 真的是异步的吗?如果您不想阻塞线程(例如 UI 线程)并且不想使用大量资源对其进行 while 循环怎么办?
1赞 Drakosha 6/6/2014
@Mgamerz联接是同步的。您可以让线程将执行结果放在某个队列中,或者/和调用回调。否则,您不知道何时完成(如果有的话)。
1赞 Reda Drissi 3/28/2019
是否可以像使用多处理一样在线程执行结束时调用回调函数。池
2赞 ondra 8/6/2009 #3

有什么理由不使用线程吗?您可以使用该类。 使用 .该函数可以线程并检索结果。如果可以,请重写 和 函数以调用构造函数中指定的函数,并将值保存到类的实例中。threadingfinished()isAlive()result()join()run()__init__

评论

2赞 9/24/2009
如果它是一个计算成本高昂的函数,线程不会给你带来任何东西(实际上它可能会使事情变慢),因为由于 GIL,Python 进程被限制在一个 CPU 内核上。
2赞 Peter Hansen 12/14/2009
@Kurt,虽然这是真的,但OP没有提到性能是他关心的问题。想要异步行为还有其他原因......
0赞 CivFan 1/21/2016
当您想要选择终止异步方法调用时,python 中的线程不是很好,因为只有 python 中的主线程接收信号。
154赞 Lucas S. 8/6/2009 #4

您可以使用 Python 2.6 中添加的多处理模块。您可以使用进程池,然后通过以下方式异步获取结果:

apply_async(func[, args[, kwds[, callback]]])

例如:

import time
from multiprocessing import Pool

def postprocess(result):
    print("finished: %s" % result)

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback=postprocess) # Evaluate "f(10)" asynchronously calling callback when finished.
    print("waiting...")
    time.sleep(1)

这只是一种选择。该模块提供了许多工具来实现您想要的。此外,从中制作装饰器也非常容易。

评论

9赞 user47741 2/6/2010
可能值得记住的是,这会生成单独的进程,而不是进程中的单独线程。这可能会产生一些影响。
7赞 DataGreed 8/30/2009
Lucas S.,不幸的是,你的例子不起作用。回调函数永远不会被调用。
15赞 Michael Jackson 11/18/2011
这有效:result = pool.apply_async(f, [10], callback=finish)
7赞 Drahkar 12/22/2014
要真正在 python 中异步做任何事情,需要使用 multiprocessing 模块来生成新进程。仅仅创建新线程仍然受全局解释器锁的摆布,它阻止 python 进程同时执行多件事。
2赞 Almog Cohen 3/27/2016
如果您不想在使用此解决方案时生成新进程,请将导入更改为 .multiprocessing.dummy 具有在线程而不是进程上实现的完全相同的行为from multiprocessing.dummy import Pool
11赞 Nevyn 3/9/2011 #5

我的解决方案是:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

并完全按照要求工作:

@Async
def fnc():
    pass

评论

0赞 FLAW 3/27/2022
这是一种优雅的方式。它在 Python 2 中有效吗?如果是这样,它几乎就像没有要求的关键词甜度async
23赞 xperroni 3/30/2011 #6

你可以实现一个装饰器来使你的函数异步,尽管这有点棘手。该模块充满了小怪癖和看似任意的限制——不过,更有理由将其封装在一个友好的界面后面。multiprocessing

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

下面的代码说明了装饰器的用法:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

在实际案例中,我会在装饰器上进行更多阐述,提供一些方法来关闭它进行调试(同时保持未来的接口),或者可能是处理异常的工具;但我认为这很好地证明了这个原则。

评论

0赞 Aminah Nuraini 3/24/2016
这应该是最好的答案。我喜欢它如何返回价值。不像线程那样只是异步运行。
21赞 Roman Rader 10/8/2012 #7

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
8赞 Raj 1/9/2014 #8

您可以使用 eventlet。它允许您编写看似同步的代码,但让它在网络上异步运行。

下面是一个超小爬虫的示例:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
5赞 Nicholas Hamilton 3/3/2014 #9

像这样的东西对我有用,然后你可以调用该函数,它会将自己调度到一个新线程上。

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
74赞 camabeh 3/29/2016 #10

从 Python 3.5 开始,您可以将增强的生成器用于异步函数。

import asyncio
import datetime

增强的生成器语法:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

新的 async/await 语法:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

评论

10赞 Fab 5/13/2017
@carnabeh,您能否扩展该示例以包含 OP 的“def longComputation()”函数?大多数示例使用 “await asyncio.sleep(1)”,但如果 longComputation() 返回,比如说,双精度值,你就不能只使用 “await longComputation()”。
4赞 zeh 6/22/2020
十年后,这应该是现在公认的答案。当你在 python3.5+ 中谈论 async 时,我想到的应该是 asyncio 和 async 关键字。
0赞 msarafzadeh 7/21/2020
这个答案使用“新的和闪亮的”python语法。这应该是现在的#1答案。
4赞 Big Pumpkin 7/23/2017 #11

您可以使用 concurrent.futures(在 Python 3.2 中添加)。

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')

评论

0赞 Reda Drissi 3/29/2019
这是一个非常好的答案,因为它是这里唯一一个给出带有回调的线程池可能性的答案
0赞 Alex 1/25/2020
不幸的是,这也受到“全局解释器锁定”的影响。请参阅库文档:链接。与 Python 3.7 版本兼容
0赞 Golden Lion 8/20/2021
这是阻塞异步调用吗
0赞 Keivan 4/20/2018 #12

您可以使用 process。如果你想永远运行它,请在你的函数中使用while(如网络):

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

如果您只想运行一次,请这样做:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
3赞 do-me 12/31/2020 #13

2021 年异步调用的原生 Python 方式,Python 3.9 也适用于 Jupyter / Ipython 内核

Camabeh 的答案是自 Python 3.3 以来要走的路。

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

这将在 Jupyter Notebook / Jupyter Lab 中起作用,但会引发错误:

RuntimeError: This event loop is already running

由于 Ipython 使用事件循环,我们需要一种称为嵌套异步循环的东西,它尚未在 Python 中实现。幸运的是,有nest_asyncio来解决这个问题。您需要做的就是:

!pip install nest_asyncio # use ! within Jupyter Notebook, else pip install in shell
import nest_asyncio
nest_asyncio.apply()

(基于此线程)

只有当你调用它时才会抛出另一个错误,因为它可能指的是 Ipython 的主循环。loop.close()

RuntimeError: Cannot close a running event loop

一旦有人回答了这个 github 问题,我就会更新这个答案。

6赞 Benyamin Jafari 4/6/2022 #14

Python 3.7 及更高版本中较新的运行方法是使用而不是创建、调用和关闭它:asyncioasyncio.run()looploop.run_until_complete()

import asyncio
import datetime

async def display_date(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...", datetime.datetime.now())
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break


asyncio.run(display_date(5))