无法使用自动加载器将流数据写入表

Cant write the stream data into table using autoloader

提问人:Greencolor 提问时间:9/12/2023 最后编辑:Greencolor 更新时间:9/12/2023 访问量:47

问:

我试图将我的数据从青铜层移动到银层,只查看更改源数据。

我正在读取数据如下:

df = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")

然后我有它应该对上面创建的内容进行一些操作。并且还包括将其更新插入到银层的方法。defdfmerge

from pyspark.sql.functions import col
def update_changefeed(df, epochId):
    filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
    filtered_df.createOrReplaceTempView("tempVW_test2")
    dfUpdates = sqlContext.sql("""
        SELECT
            -- `id`,
            BK_ID,
            t1.nrq_customerassetpropertyid,
            Level1,
            Level2,
            Level3,
            Yearofmanufacture,
            Make,
            IsDelete
        FROM tempVW_test2 AS t1
        /* Add key properties as columns */
        LEFT JOIN (
            SELECT
                nrq_customerassetpropertyid,
                MAX(CASE WHEN nrq_assetpropertyidname = 'Level 1' THEN nrq_value END) AS Level1,
                MAX(CASE WHEN nrq_assetpropertyidname = 'Level 2' THEN nrq_value END) AS Level2,
                MAX(CASE WHEN nrq_assetpropertyidname = 'Level 3' THEN nrq_value END) AS Level3,
                MAX(CASE WHEN nrq_assetpropertyidname = 'Make' THEN nrq_value END) AS Make,
                MAX(CASE WHEN nrq_assetpropertyidname = 'Year of manufacture' THEN nrq_value END) AS Yearofmanufacture,
            FROM tempVW_test2
            GROUP BY nrq_customerassetpropertyid
        ) AS t2 ON t1.nrq_customerassetpropertyid = t2.nrq_customerassetpropertyid
    """)
    print(dfUpdates)
    p = re.compile('^BK_')
    list_of_columns = dfUpdates.columns
    list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
    string = ''
    for column in list_of_BK_columns:
        string += f'table.{column} = newData.{column} and '
    string_insert = ''
    for column in list_of_BK_columns:
        string_insert += f'table.{column} = newData.{column} and '
    string_insert[:-4]
    dictionary = {}
    for key in list_of_columns:
        dictionary[key] = f'newData.{key}'

    # print("printing " + cdm + " columns")
    print(dfUpdates.columns)

    deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/D365/nrq_customerassetproperty_autoloader_nodups")
    deltaTable.alias('table') \
        .merge(dfUpdates.alias("newData"), string) \
        .whenMatchedUpdate(set=dictionary) \
        .whenNotMatchedInsert(values=dictionary) \
        .execute()

作为最后一步,我尝试将其编写如下:

df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_changefeed(df, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

但是我收到错误:

使用流源的查询必须使用 writeStream.start() 执行; 太浩湖

python databricks azure-databricks 自动加载

评论


答: 暂无答案