提问人:Moriz Bühler 提问时间:11/14/2023 最后编辑:Jason AllerMoriz Bühler 更新时间:11/15/2023 访问量:43
Airflow XCOM 拉入任务组
Airflow XCOM Pull in Taskgroup
问:
我是 Airflow 的新手,在 Python 函数和任务组之间交换变量时遇到了一些问题。(我知道这不是气流的要点,但在我的情况下是必要的)。 为了更好地理解,我添加了这个代码片段,它应该传达我打算做什么:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task_group
import logging
from pendulum import datetime
def push_function(**kwargs):
files = ['a','b','c']
kwargs['ti'].xcom_push(key='files', value=files)
with DAG(
"tst",
start_date=datetime(2023, 11, 7),
schedule_interval="30 6 * * *",
catchup=False,
) as dag:
push = PythonOperator(
task_id='push_task',
python_callable=push_function,
dag=dag,
)
@task_group(group_id="group")
def pull_task(**kwargs):
data= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
logging.info(f"transfaired Variable: {folders_today}")
for item in data:
filepath = f"/tmp/{item}.xml"
extract_load = SFTPOperator(
task_id=f"download_{item}",
ssh_conn_id="sftp",
remote_filepath=f"{item}.xml",
local_filepath=filepath,
operation="get",
create_intermediate_dirs=True
)
push >> pull_task()
提供的代码 所以基本上我的问题是:在这种情况下,有没有办法让我的变量:“files”从函数“push_function”到函数“pull_task”?
像这样,这条线:
folders_today= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
给我错误:
KeyError: 'ti'
但我也尝试了其他方法,并且必须理解,任务组上下文显然在此函数中不可用。但我不确定,这意味着什么,以及我如何才能使用它。
答:
0赞
mids
11/14/2023
#1
您批注的部分需要指定要在组中运行的任务的子部分:@task_group
@task_group(group_id='group')
def pull_group():
@task
def pull_function(**kwargs):
data = kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
print(data)
... [more tasks]
# Routing
pull_function() # >> another_task_in_pull_group()
然后在外部上下文中,路由到该组:dag:
push >> pull_group()
评论
0赞
Moriz Bühler
11/15/2023
感谢您的回复。有没有办法在没有@task/任务流 API 的情况下访问 xcom?我在我的task_group中使用循环工作,一旦我像您在我的情况下所做的那样插入多个任务,它就不会再工作了。我的task_group如下所示: ' @task_group(group_id=“group”) def function(**kwargs): #data = kwargs['ti'].xcom_pull(task_ids='push_task', key='files') <-- 数据中项目的问题: extract_load = SFTPOperator( task_id=f“download_{item}”, remote_filepath=f“{item}.xml”, ... ) '
0赞
Moriz Bühler
11/15/2023
(我用评论中提供的问题代码更新了我的问题代码)
0赞
mids
11/15/2023
如果没有 @task 注解,你就没有一个任务实例来通过 ti 对象从 XCom 读取 files 变量 - 这是上述 KeyError 的根本原因。我很想重新设计您的逻辑以直接调用push_function,返回文件列表,作为任务组的一部分以获取文件变量,然后像您在循环中所做的那样为每个下载动态生成任务。
评论