FastAPI 以串行而不是并行方式运行 api 调用

FastAPI runs api-calls in serial instead of parallel fashion

提问人:Learning from masters 提问时间:3/18/2022 最后编辑:ChrisLearning from masters 更新时间:8/23/2023 访问量:32225

问:

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

如果我在localhost上运行我的代码 - 例如, - 在同一浏览器窗口的不同选项卡中,我得到:http://localhost:8501/ping

Hello
bye
Hello
bye

而不是:

Hello
Hello
bye
bye

我已经读过关于使用 ,但我仍然无法实现真正的并行化。怎么了?httpx

异步 python-asyncio fastapi 并发处理

评论

0赞 kevinarpe 12/24/2022
接受的答案对你有帮助吗?我仍然有和你问题一样的行为。对于单个工作线程,所有请求(同步或异步)始终串行运行,而不是并行运行。
0赞 Learning from masters 12/24/2022
老实说,我从来没有设定过使用 uvicorn 的工人数量......难道是这样吗?在不定义它的情况下,对我来说,它们是在没有异步的情况下并行工作的。至少版本fastapi=0.85.0
0赞 Robert Verdes 5/12/2023
Chrome 至少会阻止同一 URL 上的并发 GET reuqests(可能是为了有机会在下一个 URL 上使用 chached versin?在隐身中使用一个 Chrome 进行测试应该可以使用“def”和“async def”。
1赞 Bruce 10/15/2023
简而言之,替换为 ,那么你将获得并发性。time.sleep()await asyncio.sleep()

答:

0赞 user3666197 3/18/2022 #1

问 :
“ ...怎么了?"

答:
FastAPI 文档明确表示该框架使用进程内任务(继承自 Starlette)。

就其本身而言,这意味着所有这些任务都竞相接收(不时)Python Interpreter GIL 锁 - 这是一个有效的 MUTEX 恐怖全局解释器锁,它实际上将任何和所有数量的 Python 解释器进程内线程重新组合成一个且唯一的一个工作线程,而所有其他线程
都保持等待......
[SERIAL]

在细粒度上,你会看到结果 -- 如果为第二个(从第二个 FireFox-tab 手动启动)生成另一个处理程序到达 http-request 实际上花费的时间比睡眠所花费的时间长,则 GIL-lock 交错时间量子循环的结果(在每一轮 GIL-lock 发布-获取-轮盘发生之前,all-wait-one-can-work) Python 解释器内部工作没有显示更多细节, 您可以使用此处的更多详细信息(取决于 O/S 类型或版本)来查看更多线程内 LoD,例如在正在执行的异步修饰代码中:~ 100 [ms]~ 100 [ms]

import time
import threading
from   fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
                        threading.get_ident(),
                       "Python Interpreter __main__ was started ..."
                        )
...
@app.get("/ping")
async def ping( request: Request ):
        """                                __doc__
        [DOC-ME]
        ping( Request ):  a mock-up AS-IS function to yield
                          a CLI/GUI self-evidence of the order-of-execution
        RETURNS:          a JSON-alike decorated dict

        [TEST-ME]         ...
        """
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "Hello..."
                                )
        #------------------------------------------------- actual blocking work
        time.sleep( 5 )
        #------------------------------------------------- actual blocking work
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "...bye"
                                )
        return { "ping": "pong!" }

最后但并非最不重要的一点是,不要犹豫,阅读更多关于所有其他鲨鱼基于线程的代码可能遭受...甚至导致......在窗帘后面......

广告备忘录

GIL 锁、基于线程的池、异步装饰器、阻塞和事件处理的混合体 -- 不确定性的肯定组合 & HWY2HELL ;o)

104赞 Chris 3/18/2022 #2

根据 FastAPI 的文档

当您用 normal 代替 express 声明路径操作函数时 的,它在外部线程池中运行,然后等待 ed,而不是直接调用(因为它会阻止 服务器)。defasync def

此外,如下所述:

如果您使用的是与 一些东西(数据库、API、文件系统等)并且没有 支持使用 ,(目前大多数都是这种情况 数据库库),然后将路径操作函数声明为 通常,只有 .awaitdef

如果您的应用程序(以某种方式)不必与 任何其他内容并等待它响应,请使用 .async def

如果您只是不知道,请使用 normal .def

注意:您可以根据需要在路径操作函数中混合使用,并使用最佳函数定义每个函数 选项。FastAPI 将用它们做正确的事情。defasync def

无论如何,在上述任何一种情况下,FastAPI 仍然可以工作 异步且速度极快。

但是按照上述步骤,它将能够执行一些操作 性能优化。

因此,在 FastAPI 中,端点(在异步编程的上下文中,用 just 定义的函数称为同步函数)在与外部线程池不同的线程中运行,因此,FastAPI 仍将异步工作。换句话说,服务器将同时处理对此类端点的请求。然而,端点在事件循环中运行(在主(单个)线程上),也就是说,服务器还将并发/异步处理对此类端点的请求,只要在此类端点/路由中存在对非阻塞 I/O 绑定操作的 await 调用,例如等待对于(1)通过网络发送的客户端数据,(2)要读取的磁盘中文件的内容,(3)要完成的数据库操作等,(请看这里)。但是,如果一个端点定义了 does for something inside things,为了放弃事件循环中其他任务运行的时间(例如,对相同或其他端点的请求、后台任务等),则对此类端点的每个请求都必须完全完成(即退出端点),然后才能将控制权交还给事件循环并允许其他任务运行。换句话说,在这种情况下,服务器将按顺序处理请求。请注意,相同的概念不仅适用于 FastAPI 端点,也适用于 StreamingResponse 的生成器函数(参见 StreamingResponse 类实现)以及后台任务(参见 BackgroundTask 类实现);因此,在读完这个答案之后,你应该能够决定是否应该定义一个 FastAPI 端点、生成器或后台任务函数 或 。defdefawaitasync defasync defasync defawaitStreamingResponsedefasync def

关键字(仅在函数中起作用)将函数控制权传递回 .换句话说,它暂停周围协程的执行(即,协程对象是调用函数的结果),并告诉让其他东西运行,直到该任务完成。请注意,仅仅因为您可以在端点中定义一个自定义函数,然后定义该函数,这并不意味着您的代码将异步工作,例如,如果该自定义函数包含对 CPU 绑定任务、非异步 I/O 库或任何其他与异步 Python 代码不兼容的阻塞调用。例如,在 FastAPI 中,当使用 UploadFile 的方法时,例如 和 、 FastAPI/Starlette 在后台,实际上在外部线程池中运行 File 对象的此类方法(使用 run_in_threadpool() 函数)并对其进行 s;否则,此类方法/操作将阻止 .您可以通过查看 UploadFile 类的实现来了解更多信息。awaitasync defevent loopasync defevent loopawaitasync defawaitasync deftime.sleep()asyncawait file.read()await file.write()asyncawaitevent loop

请注意,这并不意味着并,而是并发。具有 asyncawait 的异步代码在很多时候被概括为使用协程。协程是协作的(或协作的多任务),这意味着“在任何给定时间,具有协程的程序运行其一个协程,并且该正在运行的协程仅在显式请求挂起时才暂停其执行”(有关协程的更多信息,请参阅此处和此处)。本文所述:async

具体来说,每当执行当前正在运行的协程时 到达表达式时,协程可能会被挂起,并且 另一个先前挂起的协程可能会恢复执行,如果它是什么 已挂起,此后返回了一个值。悬架也可以 当一个块从 异步迭代器或输入块时或 退出,因为这些操作在后台使用。awaitasync forasync withawait

但是,如果直接在函数/端点内执行/调用阻塞 I/O 绑定或 CPU 绑定操作,它将阻塞主线程(因此,)。因此,阻止操作(例如在终结点中)将阻止整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不打算进行任何调用,则可以直接声明它,它将在外部线程池中运行,然后进行编辑,如前所述(以下各节中提供了更多解决方案)。例:async defevent looptime.sleep()async defasyncdefawait

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

否则,如果必须在端点内执行的函数是必须执行的函数,则应使用 定义端点。为了演示这一点,下面的示例使用了 asyncio.sleep() 函数(来自 asyncio 库),该函数提供了非阻塞睡眠操作。该方法将暂停周围协程的执行(直到休眠操作完成),从而允许事件循环中的其他任务运行。这里这里也给出了类似的例子。asyncawaitasync defawait asyncio.sleep()

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    #print(request.client)
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

上述两个端点将按照问题中提到的相同顺序将指定的消息打印到屏幕上(如果两个请求大约同时到达),即:

Hello
Hello
bye
bye

重要提示

当您第二次(第三次,依此类推)调用终结点时,请记住从与浏览器主会话隔离的选项卡执行此操作;否则,后续请求(即在第一个请求之后)将被浏览器(在客户端)阻止,因为浏览器将在发送下一个请求之前等待服务器对前一个请求的响应。您可以通过在端点内部使用来确认这一点,您会看到所有传入请求的 和 编号相同(如果请求是从同一浏览器窗口/会话中打开的选项卡发起的),因此,这些请求将按顺序处理,因为浏览器首先按顺序发送它们。要解决此问题,您可以:print(request.client)hostnameport

  1. 重新加载相同的选项卡(与正在运行的选项卡一样),或者

  2. 在无痕式窗口中打开新标签页,或者

  3. 使用其他浏览器/客户端发送请求,或者

  4. 使用该库发出异步 HTTP 请求,以及可等待asyncio.gather(),它允许同时执行多个异步操作,然后以与 awaitables(任务)传递给该函数的相同顺序返回结果列表(请查看此答案了解更多详细信息)。httpx

    示例

    import httpx
    import asyncio
    
    URLS = ['http://127.0.0.1:8000/ping'] * 2
    
    async def send(url, client):
        return await client.get(url, timeout=10)
    
    async def main():
        async with httpx.AsyncClient() as client:
            tasks = [send(url, client) for url in URLS]
            responses = await asyncio.gather(*tasks)
            print(*[r.json() for r in responses], sep='\n')
    
    asyncio.run(main())
    

    如果您必须调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在从服务器返回响应后立即在客户端打印响应,而不是等待收集所有任务的结果并按照任务传递给函数的相同顺序打印它们,则可以将上述示例的函数替换为如下所示:asyncio.gather()send()send()

    async def send(url, client):
        res = await client.get(url, timeout=10)
        print(res.json())
        return res
    

Async/await和阻止 I/O 密集型或 CPU 密集型操作

如果需要使用(因为端点内的协程可能需要),但还需要一些同步阻塞 I/O 绑定或 CPU 绑定操作(长时间运行的计算任务),这些操作将阻止(本质上是整个服务器)并且不允许其他请求通过,例如:async defawaitevent loop

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = cpu_bound_task(contents)  # this will block the event loop
    finally:
        await file.close()
    print("bye")
    return "pong"

然后:

  1. 您应该检查是否可以将端点的定义更改为 normal 而不是 。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的注释部分中提到的),则可以将端点参数的类型声明为 (即 ),因此,FastAPI 将为您读取文件,并且您将收到内容。因此,没有必要使用 .请注意,上述方法应该适用于小文件,因为 enitre 文件内容将存储到内存中(请参阅有关文件参数的文档);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB 的 RAM,则无法加载 50GB 的文件),您的应用程序最终可能会崩溃。或者,您可以直接调用 SpooledTemporaryFile 的方法(可以通过对象的属性访问),这样您就不必再次使用该方法——并且您现在可以使用 normal 声明您的端点,每个请求都将在单独的线程中运行(示例如下)。有关如何上传 ,以及 Starlette/FastAPI 如何在幕后使用的更多详细信息,请查看此答案和此答案defasync defbytesfile: bytes = File()bytesawait file.read().read().fileUploadFileawait.read()defFileSpooledTemporaryFile

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = file.file.read()
            res = cpu_bound_task(contents)
        finally:
            file.file.close()
        print("bye")
        return "pong"
    
  2. 使用模块中的 FastAPI(Starlette)的 run_in_threadpool() 函数(@tiangolo如此处所建议的那样),它“将在单独的线程中运行函数,以确保主线程(运行协程的地方)不会被阻塞”参见此处)。正如@tiangolo在这里所描述的,“是一个可等待的函数,第一个参数是一个普通函数,接下来的参数直接传递给该函数。它支持序列参数和关键字参数”。concurrencyrun_in_threadpool

    from fastapi.concurrency import run_in_threadpool
    
    res = await run_in_threadpool(cpu_bound_task, contents)
    
  3. 或者,在使用 asyncio.get_running_loop() 获取运行之后,使用 的 loop.run_in_executor() 来运行任务,在这种情况下,您可以让它完成并返回结果,然后再继续下一行代码。作为 executor 参数传递,将使用默认的 executor;即 ThreadPoolExecutorasyncioevent loopawaitNone

    import asyncio
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)
    

    或者,如果你想传递关键字参数,你可以使用一个表达式(例如,),或者,最好是functools.partial(),这在loop.run_in_executor()的文档中是特别推荐的:lambdalambda: cpu_bound_task(some_arg=contents)

    import asyncio
    from functools import partial
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
    

    还可以在自定义 ThreadPoolExecutor 中运行任务。例如:

    import asyncio
    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents)
    

    在 Python 3.9+ 中,您还可以使用 asyncio.to_thread() 在单独的线程中异步运行同步函数——这本质上是在后台使用,如 asyncio.to_thread() 的实现所示。该函数采用要执行的阻塞函数的名称,以及该函数的任何参数(*args 和/或 **kwargs),然后返回一个可以编辑的协程。例:await loop.run_in_executor(None, func_call)to_thread()await

    import asyncio
    
    res = await asyncio.to_thread(cpu_bound_task, contents)
    
  4. ThreadPoolExecutor将成功防止 被阻塞,但不会给你带来并行运行代码所期望的性能改进;特别是,当需要执行操作时,例如此处描述的操作(例如,音频或图像处理、机器学习等)。因此,最好在单独的进程中运行受 CPU 限制的任务(使用 ProcessPoolExecutor,如下所示),同样,您可以与 集成,以便它完成其工作并返回结果。如此处所述,在 Windows 上,保护代码的主循环以避免递归生成子进程等非常重要。基本上,您的代码必须在 if __name__ == '__main__': 下。event loopCPU-boundasyncioawait

    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents) 
    
  5. 使用更多工作线程。例如,(如果您使用 Gunicorn 作为 Uvicorn worker 的流程管理器,请查看此答案)。注意:每个工人“都有自己的东西、变量和记忆”。这意味着变量/对象等不会在进程/工作线程之间共享。在这种情况下,应考虑使用数据库存储或键值存储(缓存),如此此处所述。此外,请注意,“如果在代码中消耗大量内存,则每个进程将消耗等量的内存”。uvicorn main:app --workers 4global

  6. 如果你需要执行繁重的后台计算,并且你不一定需要它由同一个进程运行(例如,你不需要共享内存、变量等),你可能会从使用其他更大的工具(如 Celery)中受益,如 FastAPI 的文档中所述。

评论

0赞 Learning from masters 3/18/2022
事实上,这是一项试验,用于检查为什么另一个调用正在串行运行。另一个函数调用“UploadFile”并执行“await file.read()”并运行串行。此外,这是在亚马逊服务器产品内运行的,在亚马逊的 api 网关之后,因此所有请求都来自同一个 IP,因为用户连接到亚马逊,亚马逊服务器调用我的 api。问题是文件的操作很长,如果我在最后序列化了这个,由于亚马逊的限制,我会超时。我想我将不得不去你提供的最后一个链接!
1赞 zhanymkanov 8/16/2022
计算任务是指 CPU 密集型负载。在 CPython 中,线程不会对 CPU 任务产生明显的提升,因为 GIL 目前只允许一个线程处于活动状态。因此,路线和在这里都无济于事。defrun_in_threadpool
1赞 Chris 8/16/2022
@zhanymkanov感谢您的评论。我知道 Python 的 GIL,因此,我计划很快扩展上述答案,以使用多处理提供进一步的解决方案。上面提到的选项 1(即增加数量 )已经是解决此问题的一种方法。无论如何,在等待的外部线程池中运行此类任务,而不是直接调用(尽管不提供真正的并行性)总比没有好,因为否则此类任务会阻塞整个服务器。workers
1赞 Chris 9/3/2022
@bravmi 不客气。上述相关部分已更新;希望现在更清楚了。有关更多详细信息,请查看上面提供的链接。
1赞 lenin 10/25/2023
很遗憾,在fastAPI官方文档中找不到这么好的答案,该文档没有清楚地描述这些。这将为很多人节省大量时间