在 Google Cloud Functions 的请求中使用并发和 python asyncio

Using concurrency with python asyncio in requests in Google Cloud Functions

提问人:Manuel Huppertz 提问时间:11/13/2023 最后编辑:Manuel Huppertz 更新时间:11/17/2023 访问量:89

问:

我正在尝试向 OpenAI API 发送请求,以通过 Google Cloud Function 翻译文本列表。由于 API 需要一点时间才能回答,并且我尝试每天翻译几百条文本,因此我尝试通过将请求从 GCF 并发发送到 OpenAI API 来加快流程。

因此,我在 Jupyter Notebook 中用 Python 制作了该函数的原型,并设法运行了一个并发函数,因为我知道事件循环管理存在一些差异,但最终将运行时间从大约 30 分钟减少到 4-5 分钟。然后,我试图弄清楚在Jupyter和Google Cloud Function中运行该函数之间的区别,并最终设法让它在那里运行。

但是,它似乎并没有加快 GCF 的速度,并且仍然运行 30 分钟甚至更长时间,从而导致平台其他部分出现 TimeoutErrors。

我已经检查过:

  1. 资源利用率:使用 400 MB 的 8GB
  2. 并发请求限制:设置为 8 然后意识到此设置仅适用于入站流量

这是我的云函数代码


import pandas as pd
import os

import asyncio

from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain.schema.output_parser import OutputParserException 

from langchain.schema.messages import get_buffer_string
from langchain.chains import LLMChain

from functools import wraps


import platform
if platform.system() == "Windows":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

OPEN_AI_KEY = os.environ.get("OPENAI_TOKEN")
OPENAI_MODEL = "gpt-3.5-turbo-1106"


def request_concurrency_limit_decorator(limit=5):
    # Bind the default event loop
    sem = asyncio.Semaphore(limit)

    def executor(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)

        return wrapper

    return executor


async def translate_concurrently(
    df: pd.DataFrame, openai_api_key: str = OPEN_AI_KEY, model_name: str = OPENAI_MODEL
):
    # Creating the prompt
    PROMPT_TRANSLATE_REVIEW = """Translate this into English: '{input}'"""
    llm = ChatOpenAI(openai_api_key=openai_api_key, model_name=model_name)
    message = HumanMessagePromptTemplate.from_template(
        template=PROMPT_TRANSLATE_REVIEW,
    )
    chat_prompt = ChatPromptTemplate.from_messages([message])
    chain = LLMChain(llm=llm, prompt=chat_prompt)

    # start async translation requests
    @request_concurrency_limit_decorator(limit=5)
    async def async_translate(chain: LLMChain, input: str):
        resp = await chain.arun({"input": input})
        return resp

    tasks = [async_translate(chain, input=review) for review in df["original_text"]]

    # Row order is guaranteed by asyncio.gather
    df["text_translation"] = await asyncio.gather(*tasks)


def main(ctx):
    # preparing the translation_df with a column "original_text"
    translation_df = pd.DataFrame(...)

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(translate_concurrently(translation_df))
    loop.close()

    return "OK", 200

我将不胜感激任何关于我出错的地方以及为什么 GCF 没有像我在 Jupyter Notebook 中的原型函数那样加速的提示

python google-cloud-platform 并发 google-cloud-run

评论

0赞 guillaume blaquiere 11/13/2023
为什么不为每次翻译调用 Cloud Functions?
0赞 Manuel Huppertz 11/13/2023
因为这样一来,我就必须管理将original_texts路由到每个函数,支付多个实例,并确保翻译最终出现在表的右行,即更多的开销。此外,我不知道如何对这样的架构进行原型设计,这将大大增加我的开发时间。
0赞 Sandeep Vokkareni 11/13/2023
您能否看一下本文档中分享的建议。
0赞 Manuel Huppertz 11/13/2023
这是一篇很棒的文章,启发了我使用信号量。不过,我无法真正理解如何将langchain arun函数与aiohttp中的会话逻辑结合使用,这就是为什么我选择了没有它的解决方案。我想我可能需要按照本文的思路重写我的代码,并且没有langchain,但我至少想理解为什么它一开始就不起作用。

答:

0赞 Manuel Huppertz 11/17/2023 #1

据我所知,OpenAI API 有时需要非常长的时间才能响应请求——甚至超过 10 分钟。这些调用不会失败,因此无法正常处理。

因此,绝大多数请求都会在几秒钟内得到响应,但每 20 次左右的提示,就会花费很长时间并降低整个 Cloud Function 的性能。

我找不到一种在langchain库框架内有效强制超时的方法。

最后,我重写了错误处理,并在不使用langchain的情况下重试逻辑。有些请求仍然会永远运行,但使用 asyncio,我可以强制超时并启动一个新请求,该请求通常运行得更快。