“-A”/“--app”的值无效:无法加载芹菜应用程序。“str”对象在将 Celery 与 Flask 集成时没有属性“level”?

Invalid value for '-A' / '--app': Unable to load celery application. 'str' object has no attribute 'level' when Integrating Celery with Flask?

提问人:JTPing 提问时间:10/22/2023 更新时间:10/22/2023 访问量:19

问:

为了发出长时间运行的任务,例如在后台作业中删除帖子,我决定根据 Flask 官方教程将 Celery 与 Flask 集成为新的初学者 Background Tasks with Celery 但是出现错误报告:

  `[2023-10-22 18:15:47,668\] INFO in __init__: Microblog startup
  [2023-10-22 18:15:47,684\] INFO in __init__: Microblog startup
  Usage: celery \[OPTIONS\] COMMAND \[ARGS\]...
  Try 'celery --help' for help.

  Error: Invalid value for '-A' / '--app':
  Unable to load celery application.
  'str' object has no attribute 'level'`

文件结构如下:

   --app
     --__init__.py
   --make_celery.py

以下代码为:

// The __init__.py
def celery_init_app(app: Flask):
   class FlaskTask(Task):
      def __call__(self, *args: object, **kwargs: object):
         with app.app_context():
            return self.run(*args, **kwargs)

   celery_app  = Celery(app.name, task_cls=FlaskTask)
   celery_app.config_from_object(app.config['CELERY'])
   celery_app.set_default()
   app.extensions['celery'] = celery_app 
   return celery_app 


// call the celery_init_app in factory function
def create_app(config_class=Config):
   app.config.from_object(config_class)
   app.config.from_mapping(
    CELERY=dict(
        broker_url = "pyamqp://guest@localhost//",
        result_backend = "redis://localhost",

        task_serializer = 'json',
        result_serializer = 'json',
        accept_content = ['json'],
        task_ignore_result = True,
    ),
   )
   app.config.from_prefixed_env()

   // ... other configuration code
   celery_init_app(app)
   return app

make_celery代码为:

from app import create_app 

flask_app = create_app()
celery_app = flask_app.extensions["celery"]`

如果有人给我帮助,我将不胜感激。

我试着在网上搜索相关问题,但我所有的努力都是徒劳的。

Flask 异步 芹菜 任务

评论


答: 暂无答案