提问人:Learning from masters 提问时间:3/18/2022 最后编辑:ChrisLearning from masters 更新时间:8/23/2023 访问量:32588
FastAPI 以串行而不是并行方式运行 api 调用
FastAPI runs api-calls in serial instead of parallel fashion
问:
我有以下代码:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
如果我在localhost上运行我的代码 - 例如, - 在同一浏览器窗口的不同选项卡中,我得到:http://localhost:8501/ping
Hello
bye
Hello
bye
而不是:
Hello
Hello
bye
bye
我已经读过关于使用 ,但我仍然无法实现真正的并行化。怎么了?httpx
答:
问 :
“ ...怎么了?"
答:
FastAPI 文档明确表示该框架使用进程内任务(继承自 Starlette)。
就其本身而言,这意味着所有这些任务都竞相接收(不时)Python Interpreter GIL 锁 - 这是一个有效的 MUTEX 恐怖全局解释器锁,它实际上将任何和所有数量的 Python 解释器进程内线程重新组合成一个且唯一的一个工作线程,而所有其他线程
都保持等待......[SERIAL]
在细粒度上,你会看到结果 -- 如果为第二个(从第二个 FireFox-tab 手动启动)生成另一个处理程序到达 http-request 实际上花费的时间比睡眠所花费的时间长,则 GIL-lock 交错时间量子循环的结果(在每一轮 GIL-lock 发布-获取-轮盘发生之前,all-wait-one-can-work) Python 解释器内部工作没有显示更多细节, 您可以使用此处的更多详细信息(取决于 O/S 类型或版本)来查看更多线程内 LoD,例如在正在执行的异步修饰代码中:~ 100 [ms]
~ 100 [ms]
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
最后但并非最不重要的一点是,不要犹豫,阅读更多关于所有其他鲨鱼基于线程的代码可能遭受...甚至导致......在窗帘后面......
广告备忘录
GIL 锁、基于线程的池、异步装饰器、阻塞和事件处理的混合体 -- 不确定性的肯定组合 & HWY2HELL ;o)
根据 FastAPI 的文档:
当您用 normal 代替 express 声明路径操作函数时 的,它在外部线程池中运行,然后等待 ed,
而不是直接调用(因为它会阻止 服务器)。
def
async def
此外,如下所述:
如果您使用的是与 一些东西(数据库、API、文件系统等)并且没有 支持使用 ,(目前大多数都是这种情况 数据库库),然后将路径操作函数声明为 通常,只有 .
await
def
如果您的应用程序(以某种方式)不必与 任何其他内容并等待它响应,请使用 .
async def
如果您只是不知道,请使用 normal .
def
注意:您可以根据需要在路径操作函数中混合使用,并使用最佳函数定义每个函数 选项。FastAPI 将用它们做正确的事情。
def
async def
无论如何,在上述任何一种情况下,FastAPI 仍然可以工作 异步且速度极快。
但是按照上述步骤,它将能够执行一些操作 性能优化。
因此,在 FastAPI 中,端点(在异步编程的上下文中,用 just 定义的函数称为同步函数)在与外部线程池不同的线程中运行,因此,FastAPI 仍将异步工作。换句话说,服务器将同时处理对此类端点的请求。然而,端点在事件循环
中运行(在主(单个)线程上),也就是说,服务器还将并发/异步处理对此类端点的请求,只要在此类端点/路由中存在对非阻塞 I/O 绑定操作的 await
调用,例如等待对于(1)通过网络发送的客户端数据,(2)要读取的磁盘中文件的内容,(3)要完成的数据库操作等,(请看这里)。但是,如果一个端点定义了 does for something inside things,为了放弃事件循环中其他任务运行的时间(例如,对相同或其他端点的请求、后台任务等),则对此类端点的每个请求都必须完全完成(即退出端点),然后才能将控制权交还给事件循环并允许其他任务运行。换句话说,在这种情况下,服务器将按顺序处理请求。请注意,相同的概念不仅适用于 FastAPI 端点,也适用于 StreamingResponse 的生成器函数(参见 StreamingResponse
类实现)以及后台任务
(参见 BackgroundTask
类实现);因此,在读完这个答案之后,你应该能够决定是否应该定义一个 FastAPI 端点、生成器或后台任务函数 或 。
def
def
await
async def
async def
async def
await
StreamingResponse
def
async def
关键字(仅在函数中起作用)将函数控制权传递回 .换句话说,它暂停周围协程的执行(即,协程对象是调用函数的结果),并告诉让其他东西运行,直到该任务完成。请注意,仅仅因为您可以在端点中定义一个自定义函数,然后定义该函数,这并不意味着您的代码将异步工作,例如,如果该自定义函数包含对 CPU 绑定任务、非异步 I/O 库或任何其他与异步 Python 代码不兼容的阻塞调用。例如,在 FastAPI 中,当使用 UploadFile
的方法时,例如 和 、 FastAPI/Starlette 在后台,实际上在外部线程池中运行 File 对象的此类方法(使用 run_in_threadpool(
) 函数)并对其进行 s;否则,此类方法/操作将阻止 .您可以通过查看 UploadFile
类的实现来了解更多信息。await
async def
event loop
async def
event loop
await
async def
await
async def
time.sleep()
async
await file.read()
await file.write()
async
await
event loop
请注意,这并不意味着并行,而是并发。具有 async
和 await
的异步代码在很多时候被概括为使用协程。协程是协作的(或协作的多任务),这意味着“在任何给定时间,具有协程的程序只运行其一个协程,并且该正在运行的协程仅在显式请求挂起时才暂停其执行”(有关协程的更多信息,请参阅此处和此处)。如本文所述:async
具体来说,每当执行当前正在运行的协程时 到达表达式时,协程可能会被挂起,并且 另一个先前挂起的协程可能会恢复执行,如果它是什么 已挂起,此后返回了一个值。悬架也可以 当一个块从 异步迭代器或输入块时或 退出,因为这些操作在后台使用。
await
async for
async with
await
但是,如果直接在函数/端点内执行/调用阻塞 I/O 绑定或 CPU 绑定操作,它将阻塞主线程(因此,)。因此,阻止操作(例如在终结点中)将阻止整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不打算进行任何调用,则可以直接声明它,它将在外部线程池中运行,然后进行编辑,如前所述(以下各节中提供了更多解决方案)。例:async def
event loop
time.sleep()
async def
async
def
await
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
否则,如果必须在端点内执行的函数是必须执行的函数,则应使用 定义端点。为了演示这一点,下面的示例使用了 asyncio.sleep()
函数(来自 asyncio
库),该函数提供了非阻塞睡眠操作。该方法将暂停周围协程的执行(直到休眠操作完成),从而允许事件循环中的其他任务运行。这里和这里也给出了类似的例子。async
await
async def
await asyncio.sleep()
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
上述两个端点将按照问题中提到的相同顺序将指定的消息打印到屏幕上(如果两个请求大约同时到达),即:
Hello
Hello
bye
bye
重要提示
当您第二次(第三次,依此类推)调用终结点时,请记住从与浏览器主会话隔离的选项卡执行此操作;否则,后续请求(即在第一个请求之后)将被浏览器(在客户端)阻止,因为浏览器将在发送下一个请求之前等待服务器对前一个请求的响应。您可以通过在端点内部使用来确认这一点,您会看到所有传入请求的 和 编号相同(如果请求是从同一浏览器窗口/会话中打开的选项卡发起的),因此,这些请求将按顺序处理,因为浏览器首先按顺序发送它们。要解决此问题,您可以:print(request.client)
hostname
port
重新加载相同的选项卡(与正在运行的选项卡一样),或者
在无痕式窗口中打开新标签页,或者
使用其他浏览器/客户端发送请求,或者
使用该库发出异步 HTTP 请求,以及可等待的
asyncio.gather(),
它允许同时执行多个异步操作,然后以与 awaitables(任务)传递给该函数的相同顺序返回结果列表(请查看此答案了解更多详细信息)。httpx
示例:
import httpx import asyncio URLS = ['http://127.0.0.1:8000/ping'] * 2 async def send(url, client): return await client.get(url, timeout=10) async def main(): async with httpx.AsyncClient() as client: tasks = [send(url, client) for url in URLS] responses = await asyncio.gather(*tasks) print(*[r.json() for r in responses], sep='\n') asyncio.run(main())
如果您必须调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在从服务器返回响应后立即在客户端打印响应,而不是等待收集所有任务的结果并按照任务传递给函数的相同顺序打印它们,则可以将上述示例的函数替换为如下所示:
asyncio.gather()
send()
send()
async def send(url, client): res = await client.get(url, timeout=10) print(res.json()) return res
Async
/await
和阻止 I/O 密集型或 CPU 密集型操作
如果需要使用(因为端点内的协程可能需要),但还需要一些同步阻塞 I/O 绑定或 CPU 绑定操作(长时间运行的计算任务),这些操作将阻止(本质上是整个服务器)并且不允许其他请求通过,例如:async def
await
event loop
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
然后:
您应该检查是否可以将端点的定义更改为 normal 而不是 。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的注释部分中提到的),则可以将端点参数的类型声明为 (即 ),因此,FastAPI 将为您读取文件,并且您将收到内容。因此,没有必要使用 .请注意,上述方法应该适用于小文件,因为 enitre 文件内容将存储到内存中(请参阅
有关文件
参数的文档);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB 的 RAM,则无法加载 50GB 的文件),您的应用程序最终可能会崩溃。或者,您可以直接调用SpooledTemporaryFile
的方法(可以通过对象的属性访问),这样您就不必再次使用该方法——并且您现在可以使用 normal 声明您的端点,每个请求都将在单独的线程中运行(示例如下)。有关如何上传 ,以及 Starlette/FastAPI 如何在幕后使用的更多详细信息,请查看此答案和此答案。def
async def
bytes
file: bytes = File()
bytes
await file.read()
.read()
.file
UploadFile
await
.read()
def
File
SpooledTemporaryFile
@app.post("/ping") def ping(file: UploadFile = File(...)): print("Hello") try: contents = file.file.read() res = cpu_bound_task(contents) finally: file.file.close() print("bye") return "pong"
使用模块中的 FastAPI(Starlette)的 run_in_threadpool() 函数(@tiangolo如此处所建议的那样),它“将在单独的线程中运行函数,以确保主线程(运行协程的地方)不会被阻塞”
(
参见此处)。正如@tiangolo在这里所描述的,“是一个可等待的函数,第一个参数是一个普通函数,接下来的参数直接传递给该函数。它支持序列参数和关键字参数”。concurrency
run_in_threadpool
from fastapi.concurrency import run_in_threadpool res = await run_in_threadpool(cpu_bound_task, contents)
或者,在使用 asyncio.get_running_loop() 获取运行之后,使用 的
loop.run_in_executor()
来运行任务,在这种情况下,您可以让它完成并返回结果,然后再继续下一行代码。作为 executor 参数传递,将使用默认的 executor;即
ThreadPoolExecutor
:asyncio
event loop
await
None
import asyncio loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, cpu_bound_task, contents)
或者,如果你想传递关键字参数,你可以使用一个表达式(例如,),或者,最好是
functools.partial(),
这在loop.run_in_executor()
的文档中是特别推荐的:lambda
lambda: cpu_bound_task(some_arg=contents)
import asyncio from functools import partial loop = asyncio.get_running_loop() res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
还可以在自定义
ThreadPoolExecutor
中运行任务。例如:import asyncio import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
在 Python 3.9+ 中,您还可以使用 asyncio.to_thread() 在单独的线程中异步运行同步函数——这本质上是在后台使用,如
asyncio.to_thread()
的实现所示。该函数采用要执行的阻塞函数的名称,以及该函数的任何参数(*args 和/或 **kwargs),然后返回一个可以编辑的协程。例:
await loop.run_in_executor(None, func_call)
to_thread()
await
import asyncio res = await asyncio.to_thread(cpu_bound_task, contents)
ThreadPoolExecutor
将成功防止 被阻塞,但不会给你带来并行运行代码所期望的性能改进;特别是,当需要执行操作时,例如此处描述的操作(例如,音频或图像处理、机器学习等)。因此,最好在单独的进程中运行受 CPU 限制的任务(使用ProcessPoolExecutor
,如下所示),同样,您可以与 集成,以便它完成其工作并返回结果。如此处所述,在 Windows 上,保护代码的主循环以避免递归生成子进程等非常重要。基本上,您的代码必须在if __name__ == '__main__':
下。event loop
CPU-bound
asyncio
await
import concurrent.futures loop = asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: res = await loop.run_in_executor(pool, cpu_bound_task, contents)
使用更多工作线程。例如,(如果您使用 Gunicorn 作为 Uvicorn worker 的流程管理器,请查看此答案)。注意:每个工人“都有自己的东西、变量和记忆”。这意味着变量/对象等不会在进程/工作线程之间共享。在这种情况下,应考虑使用数据库存储或键值存储(缓存),如此处和此处所述。此外,请注意,“如果在代码中消耗大量内存,则每个进程将消耗等量的内存”。
uvicorn main:app --workers 4
global
如果你需要执行繁重的后台计算,并且你不一定需要它由同一个进程运行(例如,你不需要共享内存、变量等),你可能会从使用其他更大的工具(如 Celery)中受益,如 FastAPI 的文档中所述。
评论
def
run_in_threadpool
多处理
提供进一步的解决方案。上面提到的选项 1(即增加数量 )已经是解决此问题的一种方法。无论如何,在等待的外部线程池中运行此类任务,而不是直接调用(尽管不提供真正的并行性)总比没有好,因为否则此类任务会阻塞整个服务器。workers
评论
time.sleep()
await asyncio.sleep()