提问人:Mike 提问时间:6/17/2023 更新时间:6/17/2023 访问量:68
Celery ChordError 从深度嵌套的工作流传播并在错误处理程序中捕获它们
Celery ChordError propagation from deeply nested workflows and catching them in Error Handlers
问:
稍微介绍一下上下文,我们正在运行 Python 3.11.2 和 Celery 5.3
我在我们团队构建的新应用程序中有一个相当复杂的芹菜用例。它嵌套了一堆链/组/和弦,这些链/组/和弦通常都按预期工作。
我们希望在任务/工作流运行器中添加一个轻量级的工作流包装器,以便我们可以跟踪给定工作流的运行时间。在与我的高级开发人员来回交流后,我们想出了以下包装器。它非常简单,完全满足了我们的需求。
return ( # Create a chain to start the workflow and return the AsynchResult
workflow_factory(*args, **kwargs).on_error(workflow_error_handler.s(workflow.id))
| workflow_success_handler.si(workflow.id)
).delay()
workflow_factor
是来自我们应用程序中其他地方的任何已注册工作流,它需要返回一个链或组,然后我们可以将处理程序附加到该链或组。假设没有错误,它应该处理任务的所有清理工作。on_error
workflow_success_handler
通常,这按预期工作。工作流要么成功,要么调用我们的成功处理程序,一切都按照我们的预期发生。或者,也许存在故障,是否可以调用错误处理程序,然后再次处理所有内容。
但是,我们有一个工作流,它实际上是所有其他工作流的顶峰:
return (
notify_workflow_started.si('EXAMPLE MAIN', day, company_ids),
get_Z_workflow(*args),
chord(
[
get_A_workflow(*args),
get_B_workflow(*args),
get_C_workflow(*args),
get_D_workflow(*args),
get_E_workflow(*args),
],
get_F_workflow(*args),
get_G_workflow(*args, omit_if_data_exists=True)
),
notify_workflow_finished.si('EXAMPLE MAIN', day, company_ids),
)
这就是我们的“主要”workflow_factor返回的内容 - 出于显而易见的原因,我混淆了名称,我认为这里重要的是工作流程结构,而不是逻辑本身。
这已经是相当嵌套的 - 但它更深入,让我们以工作流 C 为例,因为它是最深入的,也是我现在正在测试最多的地方......
调用时,它将返回以下内容:get_C_workflow
return (
notify_workflow_started.si('EXAMPLE C', *args),
get_example_c_stage_1_workflow(*args),
get_example_c_stage_2_workflow(*args),
notify_workflow_finished.si('EXAMPLE C', *args)
)
现在这种情况继续更深入,让我们看看stage_1作为收益,我正在努力朝着我在任务级别强制异常的地方努力。
stage_1返回以下内容:
return chain(
notify_workflow_started.si('EXAMPLE C STAGE 1', *args),
chord(
[
tasks.calculate_A_subfactors.si(*args),
tasks.calculate_B_subfactors.si(*args),
tasks.calculate_C_subfactors.si(*args),
tasks.calculate_D_subfactors.si(*args),
tasks.calculate_E_subfactors.si(*args),
tasks.calculate_F_subfactors.si(*args),
tasks.calculate_G_subfactors.si(*args),
],
notify_workflow_finished.si('EXAMPLE C STAGE 1', *args),
)
)
在这里,我们终于看到了在这个大型工作流中运行的一些实际任务。问题在于,在我的示例中,我强行引发了一个异常,以便我们可以测试工作流包装器本身如何处理工作流中发生的各种错误。calculate_C_subfactors
问题在于,我们似乎“嵌套得太深了”,因为现在当引发异常时,error_handler不会被调用。我们清楚地看到错误在芹菜文档中引发了一次,这是实际的自定义异常本身。然后,在一些日志之后,由于有其他任务并行运行,我们看到 ChordError 是由我们引发的异常触发的 - 但它永远不会到达 ErrorHandler。随后,由于发生了某些事情,当所有其他任务和子工作流完成时,我们的成功处理程序也不会被调用
真正有趣的部分是,此工作流由其他较小的工作流组成。因此,我可以直接调用并运行,这有效地从组/链/和弦中删除了一个“层”,在这种情况下,错误处理程序按预期工作。它捕获错误,做它需要做的事情,一切都很棒。get_C_workflow
关于CHordErrors/Task Errors如何通过嵌套链/和弦/组传播,我们是否遗漏了某些会导致此行为的内容?
我们试图用隐式和弦(一个组和一个其他任务的链)交换显式和弦,我在其他一些帖子中看到了一些与异常如何传播相关的设置,这些设置对行为没有影响。
我们使用 redis 后端运行,因此我们知道任务状态正在正确管理,因为这是能够使用和弦的先决条件。
在过去的一周里,这完全难倒了我和另外两名开发人员,我们似乎找不到前进的道路。很想从任何可能知道这里发生的事情的人那里得到一些见解。
答: 暂无答案
评论