提问人:Greencolor 提问时间:9/12/2023 最后编辑:Greencolor 更新时间:9/12/2023 访问量:47
无法使用自动加载器将流数据写入表
Cant write the stream data into table using autoloader
问:
我试图将我的数据从青铜层移动到银层,只查看更改源数据。
我正在读取数据如下:
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")
然后我有它应该对上面创建的内容进行一些操作。并且还包括将其更新插入到银层的方法。def
df
merge
from pyspark.sql.functions import col
def update_changefeed(df, epochId):
filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
filtered_df.createOrReplaceTempView("tempVW_test2")
dfUpdates = sqlContext.sql("""
SELECT
-- `id`,
BK_ID,
t1.nrq_customerassetpropertyid,
Level1,
Level2,
Level3,
Yearofmanufacture,
Make,
IsDelete
FROM tempVW_test2 AS t1
/* Add key properties as columns */
LEFT JOIN (
SELECT
nrq_customerassetpropertyid,
MAX(CASE WHEN nrq_assetpropertyidname = 'Level 1' THEN nrq_value END) AS Level1,
MAX(CASE WHEN nrq_assetpropertyidname = 'Level 2' THEN nrq_value END) AS Level2,
MAX(CASE WHEN nrq_assetpropertyidname = 'Level 3' THEN nrq_value END) AS Level3,
MAX(CASE WHEN nrq_assetpropertyidname = 'Make' THEN nrq_value END) AS Make,
MAX(CASE WHEN nrq_assetpropertyidname = 'Year of manufacture' THEN nrq_value END) AS Yearofmanufacture,
FROM tempVW_test2
GROUP BY nrq_customerassetpropertyid
) AS t2 ON t1.nrq_customerassetpropertyid = t2.nrq_customerassetpropertyid
""")
print(dfUpdates)
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# print("printing " + cdm + " columns")
print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/D365/nrq_customerassetproperty_autoloader_nodups")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"), string) \
.whenMatchedUpdate(set=dictionary) \
.whenNotMatchedInsert(values=dictionary) \
.execute()
作为最后一步,我尝试将其编写如下:
df.writeStream.format("delta").foreachBatch(lambda df, epochId: update_changefeed(df, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
但是我收到错误:
使用流源的查询必须使用 writeStream.start() 执行; 太浩湖
答: 暂无答案
评论