当存在流式聚合时,不支持追加输出模式

Append output mode not supported when there are streaming aggregations

提问人:Greencolor 提问时间:8/9/2023 更新时间:8/11/2023 访问量:68

问:

我有问题。正如这里提到的,我需要在下面的代码中将Append output mode not supported when there are streaming aggregationsmodifiedongroupby

agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))

from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")

但这会产生问题,因为其中仍然包含重复的 s。由于我没有在groupby中添加该列,因此以后我遇到了执行增量表的问题。finalIdmerge into

final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
Azure Pyspark Databricks 自动加载

评论


答:

0赞 JayashankarGS 8/11/2023 #1

在这里,您需要通过 with function 或 timestamp 列在组中给出时间戳类型列, 但是在你的情况下,你不能给出列,即使它是时间戳类型,因为你的要求是对列本身进行聚合。windowmodifiedonmodifiedon

因此,正如我之前提到的,使用具有更多天数的窗口,您确定这属于您的数据范围。

在这里,我使用了 20000 天。

agg = df.groupBy(window("modifiedon","20000 day"),"id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")

和输出。

final.writeStream.format("delta").option("checkpointLocation", "/csv_chk_pnt/").start("/out_csv/final/")

enter image description here

对于很长的旧记录,给出大量的天数,甚至可能是 50000 天。