Airflow XCOM 拉入任务组

Airflow XCOM Pull in Taskgroup

提问人:Moriz Bühler 提问时间:11/14/2023 最后编辑:Jason AllerMoriz Bühler 更新时间:11/15/2023 访问量:43

问:

我是 Airflow 的新手,在 Python 函数和任务组之间交换变量时遇到了一些问题。(我知道这不是气流的要点,但在我的情况下是必要的)。 为了更好地理解,我添加了这个代码片段,它应该传达我打算做什么:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task_group
import logging
from pendulum import datetime

def push_function(**kwargs):
    files = ['a','b','c']
    kwargs['ti'].xcom_push(key='files', value=files)

with DAG(
    "tst",
    start_date=datetime(2023, 11, 7),
    schedule_interval="30 6 * * *",
    catchup=False,
) as dag:
    push = PythonOperator(
        task_id='push_task',
        python_callable=push_function,
        dag=dag,
    )

    @task_group(group_id="group")
    def pull_task(**kwargs):
        data= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
        logging.info(f"transfaired Variable: {folders_today}")
        for item in data:
                filepath = f"/tmp/{item}.xml"
                extract_load = SFTPOperator(
                    task_id=f"download_{item}",
                    ssh_conn_id="sftp",
                    remote_filepath=f"{item}.xml",
                    local_filepath=filepath,
                    operation="get",
                    create_intermediate_dirs=True
)
      
    push >> pull_task()

提供的代码 所以基本上我的问题是:在这种情况下,有没有办法让我的变量:“files”从函数“push_function”到函数“pull_task”?

像这样,这条线:

folders_today= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')

给我错误:

KeyError: 'ti'

但我也尝试了其他方法,并且必须理解,任务组上下文显然在此函数中不可用。但我不确定,这意味着什么,以及我如何才能使用它。

airflow global-variables 关键字参数 airflow-xcom

评论


答:

0赞 mids 11/14/2023 #1

您批注的部分需要指定要在组中运行的任务的子部分:@task_group

  @task_group(group_id='group')
  def pull_group():

    @task
    def pull_function(**kwargs):
      data = kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
      print(data)

    ... [more tasks]

    # Routing
    pull_function()                        # >> another_task_in_pull_group()

然后在外部上下文中,路由到该组:dag:

push >> pull_group()

评论

0赞 Moriz Bühler 11/15/2023
感谢您的回复。有没有办法在没有@task/任务流 API 的情况下访问 xcom?我在我的task_group中使用循环工作,一旦我像您在我的情况下所做的那样插入多个任务,它就不会再工作了。我的task_group如下所示: ' @task_group(group_id=“group”) def function(**kwargs): #data = kwargs['ti'].xcom_pull(task_ids='push_task', key='files') <-- 数据中项目的问题: extract_load = SFTPOperator( task_id=f“download_{item}”, remote_filepath=f“{item}.xml”, ... ) '
0赞 Moriz Bühler 11/15/2023
(我用评论中提供的问题代码更新了我的问题代码)
0赞 mids 11/15/2023
如果没有 @task 注解,你就没有一个任务实例来通过 ti 对象从 XCom 读取 files 变量 - 这是上述 KeyError 的根本原因。我很想重新设计您的逻辑以直接调用push_function,返回文件列表,作为任务组的一部分以获取文件变量,然后像您在循环中所做的那样为每个下载动态生成任务。