在 PySpark 中将聚合流数据帧与流数据帧联接

Joining aggregate streaming dataframe with streaming dataframe in PySpark

提问人:Ravi Kiran.M 提问时间:11/17/2023 更新时间:11/17/2023 访问量:13

问:

这是我尝试运行的代码

streaming_df = (
  spark.readStream
  .option("option1", "value1")
)

agg_df = (
  streaming_df
  .filter("filter1 = 'value1'")
  .select('col1', 'timestamp')
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    F.window('timestamp', "5 minutes"),
    'col1')
  .agg(F.first('timestamp', ignorenulls=True).alias('watermarked_timestamp'))
  .withColumnRenamed("col1", "watermarked_col1")
  )

watermarked_agg_df = agg_df.withWatermark("watermarked_timestamp", "10 minutes")

watermarked_streaming_df = (
  streaming_df
  .withColumnRenamed("streaming_col1", "watermarked_streaming_col1")
  .withColumnRenamed("timestamp_processed", "watermarked_streaming_timestamp")
  .withWatermark("watermarked_streaming_timestamp", "10 minutes")
)

join_df = (
  watermarked_streaming_df.join(watermarked_agg_df, how='inner', on=
    F.expr("""
           watermarked_streaming_col1 = watermarked_col1 AND
           watermarked_streaming_timestamp >= watermarked_timestamp AND 
           watermarked_streaming_timestamp <= watermarked_timestamp + interval 5 minute
          """))
)

我可以使用 watermarked_agg_df.display() 查看watermarked_agg_df的数据

但是join_df没有任何数据显示。我已确保两个流中都有一些数据满足连接的给定条件。

谁能让我知道我在这里错过了什么?

我试图更改水印时间戳和窗口时间戳,但这是徒劳的。

python 加入 pyspark spark-structured-streaming

评论


答: 暂无答案