提问人:Ravi Kiran.M 提问时间:11/17/2023 更新时间:11/17/2023 访问量:13
在 PySpark 中将聚合流数据帧与流数据帧联接
Joining aggregate streaming dataframe with streaming dataframe in PySpark
问:
这是我尝试运行的代码
streaming_df = (
spark.readStream
.option("option1", "value1")
)
agg_df = (
streaming_df
.filter("filter1 = 'value1'")
.select('col1', 'timestamp')
.withWatermark("timestamp", "10 minutes")
.groupBy(
F.window('timestamp', "5 minutes"),
'col1')
.agg(F.first('timestamp', ignorenulls=True).alias('watermarked_timestamp'))
.withColumnRenamed("col1", "watermarked_col1")
)
watermarked_agg_df = agg_df.withWatermark("watermarked_timestamp", "10 minutes")
watermarked_streaming_df = (
streaming_df
.withColumnRenamed("streaming_col1", "watermarked_streaming_col1")
.withColumnRenamed("timestamp_processed", "watermarked_streaming_timestamp")
.withWatermark("watermarked_streaming_timestamp", "10 minutes")
)
join_df = (
watermarked_streaming_df.join(watermarked_agg_df, how='inner', on=
F.expr("""
watermarked_streaming_col1 = watermarked_col1 AND
watermarked_streaming_timestamp >= watermarked_timestamp AND
watermarked_streaming_timestamp <= watermarked_timestamp + interval 5 minute
"""))
)
我可以使用 watermarked_agg_df.display() 查看watermarked_agg_df的数据
但是join_df没有任何数据显示。我已确保两个流中都有一些数据满足连接的给定条件。
谁能让我知道我在这里错过了什么?
我试图更改水印时间戳和窗口时间戳,但这是徒劳的。
答: 暂无答案
评论