Airflow DAG 中用于 Snowflake SQL 查询的动态日期参数

Dynamic Date Parameter in Airflow DAG for Snowflake SQL Query

提问人:MC7836 提问时间:9/19/2023 更新时间:9/19/2023 访问量:62

问:

我正在使用带有 Airflow 的 Google Cloud Composer 在 Snowflake 上运行 SQL 查询。我需要根据条件设置日期参数,然后在我的 SQL 文件中使用该参数。具体来说,我试图根据上个月是否是季度末来确定日期。如果是这样,我想相应地调整我的参数,并在查询的所有相关位置重用它

我尝试将参数设置为多语句 SQL 文件的一部分,以便在 DAG 中运行,如下所示:

DECLARE
    VAR_QTR DATE;

BEGIN
    -- Set your VAR_QTR
    SET VAR_QTR = (
        SELECT 
        CASE 
            WHEN MONTH(CURRENT_DATE::DATE - INTERVAL '1 month') % 3 = 0 
            THEN DATE_TRUNC('quarter', CURRENT_DATE::DATE) - INTERVAL '1 quarter'
            ELSE DATE_TRUNC('quarter', CURRENT_DATE::DATE) - INTERVAL '0 quarter'
        END
    )
   ;

SOME SQL CODE HERE...
LEFT JOIN 
        TABLE dc
        ON dc.company_id = c.COMPANY_ID
        AND dc.deleted_at <= :$VAR_QTR 
END;

我在气流中遇到的错误是会话变量“$VAR_QTR”不存在。如何在 Airflow 中使用 SQL 保存在 fil 中(而不是 DAG 本身)来执行此操作

sql snowflake-cloud-data-platform 气流 参数传递 directed-acyclic-graphs

评论

0赞 astentx 9/19/2023
如果您想将所有内容都包含在 SQL 文件中,那么除了数据库本身之外,它与任何工具都无关。对于当前代码,不需要任何操作,只需使用赋值运算符:。然后将其用作绑定变量:setvar_qtr := <code>select null from some_table where some_column = :var_qtr
0赞 MC7836 9/19/2023
我的代码在我的IDE中运行良好,问题是一旦它托管在Composer中。会话变量似乎丢失了
0赞 RNHTTR 9/22/2023
您使用的是什么气流操作员?
0赞 MC7836 9/24/2023
雪花,但我欢迎这个问题的任何解决方案。甚至超出了我提供的范围

答: 暂无答案