提问人:j123b567 提问时间:10/30/2023 更新时间:10/30/2023 访问量:45
使用 FastAPI 从 zipfile 提供文件
Serve file from zipfile using FastAPI
问:
我想从 zip 文件提供一个文件。
有没有一些方法可以从 zip 文件服务器文件,这很好并且支持处理异常?
这是我的实验
有第一种幼稚的方法,但提供的文件可以任意大,所以我不想在内存中加载整个内容。
import zipfile
from typing import Annotated, Any
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
app = FastAPI()
zip_file_path = "data.zip"
file_path = "index.html"
@app.get("/zip0")
async def zip0():
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
return Response(content=zip_file.read(file_path))
FastAPI/starlette 提供了应该完全符合我想要的功能,但在这种情况下,它不适用于 zipfile 声明StreamingResponse
read from closed file.
@app.get("/zip1")
async def zip1():
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
with zip_file.open(file_path) as file_like:
return StreamingResponse(file_like)
我可以通过将所有内容移动到另一个函数并将其设置为依赖项来进行黑客攻击。现在我可以使用它,以便它正确地流式传输内容并在完成后关闭文件。问题是这是一个丑陋的黑客,也不支持用作依赖项。只是“修复”类型注释 from to 会引发一个断言,说用于依赖注入是一个很大的禁忌。yield
Response
Any
StreamingResponse
StreamingResponse
def get_file_stream_from_zip():
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
with zip_file.open(file_path) as file_like:
yield StreamingResponse(file_like)
@app.get("/zip2")
async def zip2(
streaming_response: Annotated[Any, Depends(get_file_stream_from_zip)],
):
return streaming_response
我可以在中间做一些看似合法的事情,但无法处理异常。当代码识别出 zip 文件不存在时,已经发送了标头。这不是以前方法的问题。
def get_file_from_zip():
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
with zip_file.open(file_path) as file_like:
yield file_like
@app.get("/zip3")
async def zip3(
file_like: Annotated[BinaryIO, Depends(get_file_from_zip)],
):
return StreamingResponse(file_like)
答: 暂无答案
评论
with
with