提问人:Greencolor 提问时间:8/9/2023 更新时间:8/11/2023 访问量:68
当存在流式聚合时,不支持追加输出模式
Append output mode not supported when there are streaming aggregations
问:
我有问题。正如这里提到的,我需要在下面的代码中将Append output mode not supported when there are streaming aggregations
modifiedon
groupby
agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))
from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")
但这会产生问题,因为其中仍然包含重复的 s。由于我没有在groupby中添加该列,因此以后我遇到了执行增量表的问题。final
Id
merge into
final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
答:
0赞
JayashankarGS
8/11/2023
#1
在这里,您需要通过 with function 或 timestamp 列在组中给出时间戳类型列,
但是在你的情况下,你不能给出列,即使它是时间戳类型,因为你的要求是对列本身进行聚合。window
modifiedon
modifiedon
因此,正如我之前提到的,使用具有更多天数的窗口,您确定这属于您的数据范围。
在这里,我使用了 20000 天。
agg = df.groupBy(window("modifiedon","20000 day"),"id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
和输出。
final.writeStream.format("delta").option("checkpointLocation", "/csv_chk_pnt/").start("/out_csv/final/")
对于很长的旧记录,给出大量的天数,甚至可能是 50000 天。
评论