气流 DAG 运行:用莳萝解腌出现 NameError:未定义名称“timedelta”

airflow DAG run: unpickling with dill gives NameError: name 'timedelta' is not defined

提问人:Felix 提问时间:11/7/2023 更新时间:11/8/2023 访问量:35

问:

我有一个简单的 DAG(启用了选项)。我正在尝试将 a 传递给 ,但是当它开始执行时,它失败并出现错误。render_template_as_native_obj{{ dag_run }}PythonVirtualenvOperatorNameError: name 'timedelta' is not defined

我正在尝试使用它,并且. 我该如何处理这个错误?Python 3.11airflow==2.7.3dill==0.3.7

这是我的 DAG:

import datetime
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill


dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)


context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
op_args = [context, Path(__file__).parent.absolute()]


def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)


make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=op_args,
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}"],
    dag=dag)

错误如下:

[2023-11-06, 18:23:21 UTC] {process_utils.py:182} INFO - Executing cmd: /tmp/venvqse65m1b/bin/python /tmp/venvqse65m1b/script.py /tmp/venvqse65m1b/script.in /tmp/venvqse65m1b/script.out /tmp/venvqse65m1b/string_args.txt /tmp/venvqse65m1b/termination.log
[2023-11-06, 18:23:21 UTC] {process_utils.py:186} INFO - Output:
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - Traceback (most recent call last):
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqse65m1b/script.py", line 17, in <module>
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -     arg_dict = dill.load(file)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -                ^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqse65m1b/lib/python3.11/site-packages/dill/_dill.py", line 287, in load
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -   File "/tmp/venvqse65m1b/lib/python3.11/site-packages/dill/_dill.py", line 442, in load
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -     obj = StockUnpickler.load(self)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -           ^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -   File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/pendulum/tz/timezone.py", line 312, in __init__
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -     self._utcoffset = timedelta(seconds=offset)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO -                       ^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - NameError: name 'timedelta' is not defined
[2023-11-06, 18:23:22 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 395, in execute
    return super().execute(context=serializable_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 609, in execute_callable
    result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 446, in _execute_python_callable_in_subprocess
    execute_in_subprocess(
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 171, in execute_in_subprocess
    execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
  File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 194, in execute_in_subprocess_with_kwargs
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvqse65m1b/bin/python', '/tmp/venvqse65m1b/script.py', '/tmp/venvqse65m1b/script.in', '/tmp/venvqse65m1b/script.out', '/tmp/venvqse65m1b/string_args.txt', '/tmp/venvqse65m1b/termination.log']' returned non-zero exit status 1.
Airflow virtualenv 泡菜 莳萝

评论

2赞 Anand Vidvat 11/7/2023
这似乎与摆锤模块有关。您能否指定一个较旧的版本和最新版本来测试此错误是否可以在不同版本之间重现?
0赞 Felix 11/7/2023
好吧,我已经尝试过 pendulum 2.0.0 和 2.1.2 - 使用它们时,我都遇到了相同的错误。
0赞 Danila Ganchar 11/8/2023
@Felix您能解释一下为什么您使用而不是吗? + 已安装。PythonVirtualenvOperatorPythonOperatorairflowdill
0赞 Felix 11/8/2023
@DanilaGanchar,我们必须使用运算符,因为在不同的 DAG 中,我们有不同的(有时是冲突的)Python 包依赖项列表。PythonVirtualenvOperator

答:

1赞 Felix 11/8/2023 #1

经过一番调查,我发现:

  • 此问题发生在 时。它不会发生;timedeltadill >= 3.6dill==0.3.5.1

  • 这个问题可以通过使用 来解决,但目前不支持(已经有一个 PR: https://github.com/apache/airflow/pull/34744,但尚未合并到 master 分支中)。pendulum==3.0airflow

评论

1赞 Anand Vidvat 11/9/2023
感谢您跟进这个!