在芹菜工人上加载和卸载模型

Load and unload model on celery worker

提问人:UnPapeur 提问时间:11/16/2023 更新时间:11/20/2023 访问量:33

问:

我目前有一个系统,可以使用 Celery 在耳语 AI 模型上启动任务。但是,现有设置涉及在每个任务中加载模型,这是次优的,因为重复加载过程会消耗大量时间。不幸的是,我无法承受持续加载模型的费用,因为其他系统需要访问 GPU VRAM。

为了解决这个问题,我正在考虑实现一个系统,该系统在收到任务时加载模型,并在没有任务后将其卸载。这种方法旨在通过最大限度地减少模型在内存中加载的时间来优化资源利用率,从而确保 GPU 资源的有效利用。我相信这种调整将提高整体系统性能和响应能力。

这是这个想法(显然不起作用)

from celery import Celery, Task
from celery.signals import (
    task_received,
    celeryd_after_setup,
    task_success,
    task_failure,
)
import redis
import torch
import whisper
r = redis.Redis(host=hostname)

model = None

def checkActivesTasksWorker():
    global model
    if r.llen(activesTasksWorker) == 0:
        # Model is deleted when there is no task left on worker
        del model
        torch.cuda.empty_cache()

@celery.task
def myTask():
    launchAnalyticsWhisper(model)

@task_received.connect
def taskReceivedHandler(sender, request, **kwargs):
    if r.llen(activesTasksWorker) == 0:
        model = whisper.load_model("medium")
    r.lpush(activesTasksWorker, request.id)

@task_success.connect(sender=myTask)
def taskSuccessHandler(sender, result, **kwargs):
    r.lrem(activesTasksWorker, 1, result["taskId"])
    checkActivesTasksWorker()


@task_failure.connect(sender=myTask)
def taskFailureHandler(sender, task_id, exception, **kwargs):
    r.lrem(activesTasksWorker, 1, task_id)
    checkActivesTasksWorker()

@celeryd_after_setup.connect
def initList(sender, instance, **kwargs):
    # Clear the active tasks for the worker
    r.delete(activesTasksWorker)

model 在所有位置都是 None,因此全局变量不起作用。 你有这个想法来做这项工作吗?

python redis pytorch 芹菜 openai-whisper

评论


答:

0赞 Karl 11/20/2023 #1

Celery 为每个任务生成新进程,这就是它们无法访问全局变量的原因。

最好的方法是为相关模型设置专用的推理服务器。推理服务器在启动时加载模型一次。celery 任务向推理服务器发出请求。

如果您必须在同一 GPU 上处理多个模型,则可以向推理服务器添加终端节点,以在 CPU 内存和 GPU 内存之间移动模型。这样一来,您可以保持 GPU 内存空闲,同时仍然只从磁盘加载一次模型。

评论

0赞 UnPapeur 11/22/2023
感谢您的回答,我认为这是一个很好的方法,我会尝试的!