提问人:Mihail Bury 提问时间:11/4/2023 最后编辑:Brian Tompsett - 汤莱恩Mihail Bury 更新时间:11/4/2023 访问量:52
异步执行 pydantic 验证
Perform pydantic validation asynchronously
问:
我正在使用 FastAPI 为市场创建一个 API,用户可以在其中购买和出售物品。但要销售商品,用户必须注册为卖家。为此,他应该满足 TIN、TRRC、BIC 和其他银行凭证。显然,这些数据应该得到验证。我希望它在我的pydantic模式中得到验证。为了验证,我想使用公共 API。当我向 API 发送请求时,它会返回 if valid, else 。为了验证这些数据中的每一部分,我在 pydantic 模型中使用了 separete 方法。我想使用 aiohttp 向 API 发送异步请求。在文档中,强烈建议对整个应用程序使用一个会话对象,并且不要为每个新请求创建一个新会话。为了解决这个问题,我创建了一个类,该类实现了模式并获取会话对象(如果存在),或者创建它并在全部完成后关闭会话。
以下是该类的代码:GET
HTTP_200
HTTP_400
Singletone
import aiohttp
class HTTPSessionManager:
_session = None
@classmethod
def get_session(cls) -> aiohttp.ClientSession:
if cls._session is None:
cls._session = aiohttp.ClientSession()
return cls._session
@classmethod
async def close_session(cls):
"""
Session closure
"""
if cls._session is not None:
await cls._session.close()
此外,我有一种通用的方法来验证每个字段以注册为卖家(BIC、TIN 等)。此方法采用 URL 和字段来验证:
from fastapi import HTTPException
from starlette import status
from src.http_session import HTTPSessionManager
async def validate(url: str, obj: str):
session = HTTPSessionManager.get_session()
try:
response = await session.get(url, params={obj: obj})
if response.status == 200:
return True
else:
return False
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
finally:
await session.close()
这个方法应该在pydantic模型中调用,如下所示:
class SellerIn(BaseModel):
mobile: str = Field(regex=r'\b\+?[7,8](\s*\d{3}\s*\d{3}\s*\d{2}\s*\d{2})\b') # group
company_name: str
company_description: str
bank_name: str
tin: str = Field()
bic: str
trrc: str
an: str
additional: str = None
@classmethod
@field_validator('bic')
async def validate_tin(cls, value):
return await validate(obj=value, url='https://htmlweb.ru/api.php?obj=validator&m=bic')
而且我不太确定,我是否可以异步使用 pydantic。如果我不能,考虑到我希望我对公共 API 的请求异步运行,我该如何解决这个问题。
附言我的python版本是3.11,FastAPI是 0.104.0,aiohttp是3.8.6,pydantic是2.4.2
我将非常感谢每一个回应和建议。
答: 暂无答案
评论
asyncio.sleep