提问人:Manuel Huppertz 提问时间:11/13/2023 最后编辑:Manuel Huppertz 更新时间:11/17/2023 访问量:89
在 Google Cloud Functions 的请求中使用并发和 python asyncio
Using concurrency with python asyncio in requests in Google Cloud Functions
问:
我正在尝试向 OpenAI API 发送请求,以通过 Google Cloud Function 翻译文本列表。由于 API 需要一点时间才能回答,并且我尝试每天翻译几百条文本,因此我尝试通过将请求从 GCF 并发发送到 OpenAI API 来加快流程。
因此,我在 Jupyter Notebook 中用 Python 制作了该函数的原型,并设法运行了一个并发函数,因为我知道事件循环管理存在一些差异,但最终将运行时间从大约 30 分钟减少到 4-5 分钟。然后,我试图弄清楚在Jupyter和Google Cloud Function中运行该函数之间的区别,并最终设法让它在那里运行。
但是,它似乎并没有加快 GCF 的速度,并且仍然运行 30 分钟甚至更长时间,从而导致平台其他部分出现 TimeoutErrors。
我已经检查过:
- 资源利用率:使用 400 MB 的 8GB
- 并发请求限制:设置为 8 然后意识到此设置仅适用于入站流量
这是我的云函数代码
import pandas as pd
import os
import asyncio
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain.schema.output_parser import OutputParserException
from langchain.schema.messages import get_buffer_string
from langchain.chains import LLMChain
from functools import wraps
import platform
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
OPEN_AI_KEY = os.environ.get("OPENAI_TOKEN")
OPENAI_MODEL = "gpt-3.5-turbo-1106"
def request_concurrency_limit_decorator(limit=5):
# Bind the default event loop
sem = asyncio.Semaphore(limit)
def executor(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with sem:
return await func(*args, **kwargs)
return wrapper
return executor
async def translate_concurrently(
df: pd.DataFrame, openai_api_key: str = OPEN_AI_KEY, model_name: str = OPENAI_MODEL
):
# Creating the prompt
PROMPT_TRANSLATE_REVIEW = """Translate this into English: '{input}'"""
llm = ChatOpenAI(openai_api_key=openai_api_key, model_name=model_name)
message = HumanMessagePromptTemplate.from_template(
template=PROMPT_TRANSLATE_REVIEW,
)
chat_prompt = ChatPromptTemplate.from_messages([message])
chain = LLMChain(llm=llm, prompt=chat_prompt)
# start async translation requests
@request_concurrency_limit_decorator(limit=5)
async def async_translate(chain: LLMChain, input: str):
resp = await chain.arun({"input": input})
return resp
tasks = [async_translate(chain, input=review) for review in df["original_text"]]
# Row order is guaranteed by asyncio.gather
df["text_translation"] = await asyncio.gather(*tasks)
def main(ctx):
# preparing the translation_df with a column "original_text"
translation_df = pd.DataFrame(...)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(translate_concurrently(translation_df))
loop.close()
return "OK", 200
我将不胜感激任何关于我出错的地方以及为什么 GCF 没有像我在 Jupyter Notebook 中的原型函数那样加速的提示
答:
0赞
Manuel Huppertz
11/17/2023
#1
据我所知,OpenAI API 有时需要非常长的时间才能响应请求——甚至超过 10 分钟。这些调用不会失败,因此无法正常处理。
因此,绝大多数请求都会在几秒钟内得到响应,但每 20 次左右的提示,就会花费很长时间并降低整个 Cloud Function 的性能。
我找不到一种在langchain库框架内有效强制超时的方法。
最后,我重写了错误处理,并在不使用langchain的情况下重试逻辑。有些请求仍然会永远运行,但使用 asyncio,我可以强制超时并启动一个新请求,该请求通常运行得更快。
评论