异步执行 pydantic 验证

Perform pydantic validation asynchronously

提问人:Mihail Bury 提问时间:11/4/2023 最后编辑:Brian Tompsett - 汤莱恩Mihail Bury 更新时间:11/4/2023 访问量:52

问:

我正在使用 FastAPI 为市场创建一个 API,用户可以在其中购买和出售物品。但要销售商品,用户必须注册为卖家。为此,他应该满足 TINTRRC、BIC 和其他银行凭证。显然,这些数据应该得到验证。我希望它在我的pydantic模式中得到验证。为了验证,我想使用公共 API。当我向 API 发送请求时,它会返回 if valid, else 。为了验证这些数据中的每一部分,我在 pydantic 模型中使用了 separete 方法。我想使用 aiohttp 向 API 发送异步请求。在文档中,强烈建议对整个应用程序使用一个会话对象,并且不要为每个新请求创建一个新会话。为了解决这个问题,我创建了一个类,该类实现了模式并获取会话对象(如果存在),或者创建它并在全部完成后关闭会话。
以下是该类的代码:
GETHTTP_200HTTP_400Singletone

import aiohttp


class HTTPSessionManager:
    _session = None

    @classmethod
    def get_session(cls) -> aiohttp.ClientSession:
        if cls._session is None:
            cls._session = aiohttp.ClientSession()
        return cls._session

    @classmethod
    async def close_session(cls):
        """
        Session closure
        """
        if cls._session is not None:
            await cls._session.close()

此外,我有一种通用的方法来验证每个字段以注册为卖家(BICTIN 等)。此方法采用 URL 和字段来验证:

from fastapi import HTTPException
from starlette import status

from src.http_session import HTTPSessionManager


async def validate(url: str, obj: str):
    session = HTTPSessionManager.get_session()
    try:

        response = await session.get(url, params={obj: obj})

        if response.status == 200:
            return True
        else:
            return False
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
    finally:

        await session.close()

这个方法应该在pydantic模型中调用,如下所示:

class SellerIn(BaseModel):
    mobile: str = Field(regex=r'\b\+?[7,8](\s*\d{3}\s*\d{3}\s*\d{2}\s*\d{2})\b')   # group
    company_name: str
    company_description: str
    bank_name: str
    tin: str = Field()
    bic: str
    trrc: str
    an: str
    additional: str = None

    @classmethod
    @field_validator('bic')
    async def validate_tin(cls, value):
        return await validate(obj=value, url='https://htmlweb.ru/api.php?obj=validator&m=bic')

而且我不太确定,我是否可以异步使用 pydantic。如果我不能,考虑到我希望我对公共 API 的请求异步运行,我该如何解决这个问题。
附言我的python版本是3.11,FastAPI是 0.104.0,aiohttp是3.8.6,pydantic是2.4.2

我将非常感谢每一个回应和建议。

蟒蛇 fastapi pydantic aiohttp

评论

0赞 MatsLindh 11/4/2023
你见过 github.com/pydantic/pydantic/issues/857github.com/pydantic/pydantic/discussions/7627 吗?你只是尝试了一下,看看是否有效吗?asyncio.sleep
0赞 Mihail Bury 11/4/2023
我已经看过第一期,但我认为在该问题发布后的几年里,我改变了一些东西。那么,如果没有,我是否应该在路由器而不是pydantic中执行此验证?
0赞 M.O. 11/4/2023
@MihailBury 是的,外部验证(例如针对数据库或其他 API 的验证)最好在路由函数中处理
1赞 MatsLindh 11/5/2023
如果你不能在服务/存储库层处理它,最好在你的服务/存储库层中处理它 - 我会尝试将该业务逻辑排除在控制器本身之外。

答: 暂无答案