提问人:JTPing 提问时间:10/22/2023 更新时间:10/22/2023 访问量:19
“-A”/“--app”的值无效:无法加载芹菜应用程序。“str”对象在将 Celery 与 Flask 集成时没有属性“level”?
Invalid value for '-A' / '--app': Unable to load celery application. 'str' object has no attribute 'level' when Integrating Celery with Flask?
问:
为了发出长时间运行的任务,例如在后台作业中删除帖子,我决定根据 Flask 官方教程将 Celery 与 Flask 集成为新的初学者 Background Tasks with Celery 但是出现错误报告:
`[2023-10-22 18:15:47,668\] INFO in __init__: Microblog startup
[2023-10-22 18:15:47,684\] INFO in __init__: Microblog startup
Usage: celery \[OPTIONS\] COMMAND \[ARGS\]...
Try 'celery --help' for help.
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
'str' object has no attribute 'level'`
文件结构如下:
--app
--__init__.py
--make_celery.py
以下代码为:
// The __init__.py
def celery_init_app(app: Flask):
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object):
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config['CELERY'])
celery_app.set_default()
app.extensions['celery'] = celery_app
return celery_app
// call the celery_init_app in factory function
def create_app(config_class=Config):
app.config.from_object(config_class)
app.config.from_mapping(
CELERY=dict(
broker_url = "pyamqp://guest@localhost//",
result_backend = "redis://localhost",
task_serializer = 'json',
result_serializer = 'json',
accept_content = ['json'],
task_ignore_result = True,
),
)
app.config.from_prefixed_env()
// ... other configuration code
celery_init_app(app)
return app
make_celery代码为:
from app import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]`
如果有人给我帮助,我将不胜感激。
我试着在网上搜索相关问题,但我所有的努力都是徒劳的。
答: 暂无答案
评论