提问人:Oscar Legat 提问时间:10/17/2023 最后编辑:Oscar Legat 更新时间:10/17/2023 访问量:27
嵌套滑动窗口和追加模式
Nested Sliding Windows and append mode
问:
我在 pyspark 中有一个嵌套的滑动窗口,在 outputMode = “append” 中有一个 kafka sink。只有滑动窗口的第一个 groupBy,我有一个与我最初的 5 分钟大小相匹配的窗口分组输出。但是当我添加嵌套的滑动窗口时,我在队列中没有得到任何发射。我的代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, stddev, collect_list, max as max_, to_json, struct
from pyspark.sql.types import IntegerType, FloatType, StringType, LongType, StructType, StructField
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("Sliding Window with Kafka Sink") \
.config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# Define the schema
schema = StructType([
StructField("ukLongStamp", LongType()),
StructField("longStamp", LongType()),
StructField("id_tarea", IntegerType()),
StructField("fecha", StringType()),
StructField("tiempo_ejecucion_ms", FloatType()),
StructField("proceso_principal", StringType()),
StructField("subproceso", StringType())
])
KAFKA_BROKERS = "172.17.0.1:29092"
KAFKA_TOPIC_INPUT = "transactions-data-process"
KAFKA_TOPIC_OUTPUT = "transactions-data-result"
# Read the data
data_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKERS) \
.option("startingOffsets", "latest") \
.option("subscribe", KAFKA_TOPIC_INPUT) \
.option('failOnDataLoss', 'false') \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("data"), col("timestamp")) \
.select("data.*", "timestamp")
windows_5min_agg = data_stream \
.withColumn("proceso_principal", col("proceso_principal")) \
.withColumn("subproceso", col("subproceso")) \
.withWatermark("timestamp", "40 seconds") \
.groupBy(window(col("timestamp"), "5 minutes", \
"1 minutes 40 seconds").alias("windows_5min_agg"),"proceso_principal","subproceso") \
.agg( \
collect_list(struct("tiempo_ejecucion_ms", "timestamp")).alias("struct_list")
)
windows_5min_agg = windows_5min_agg.withColumn("windows_5min_agg_start", col("windows_5min_agg.start").cast('timestamp')).withColumn("windows_5min_agg_end", col("windows_5min_agg.end").cast('timestamp'))
exploded_5min = windows_5min_agg.selectExpr("proceso_principal","subproceso", \
"windows_5min_agg_start","windows_5min_agg_end", \
"explode(struct_list) as struct_values")
exploded_5min = exploded_5min.selectExpr("proceso_principal","subproceso", \
"windows_5min_agg_start","windows_5min_agg_end", \
"struct_values.tiempo_ejecucion_ms as tiempo_ejecucion_ms", \
"struct_values.timestamp as timestamp")
# 30-second window operations
windows_30sec_agg = exploded_5min \
.withColumn("proceso_principal", col("proceso_principal")) \
.withColumn("subproceso", col("subproceso")) \
.withWatermark("timestamp", "8 minutes") \
.groupBy(window(col("timestamp"), "30 seconds", \
"25 seconds").alias("windows_30sec_agg"),"windows_5min_agg_start", "windows_5min_agg_end","proceso_principal","subproceso") \
.agg(stddev("tiempo_ejecucion_ms").alias("stddev_30sec_agg"), \
F.max("timestamp").alias("timestamp_max"))
windows_30sec_agg = windows_30sec_agg.withColumn("windows_30sec_agg_start", col("windows_30sec_agg.start").cast('timestamp')).withColumn("windows_30sec_agg_end", col("windows_30sec_agg.end").cast('timestamp'))
windows_30sec_agg_kafka = windows_30sec_agg \
.selectExpr("windows_30sec_agg_start","timestamp_max", "to_json(struct(*)) AS value")
#windows_5mins_agg_kafka = windows_5mins_agg_kafka.withWatermark("timestamp_max", "2 minutes")
#Write to Kafka with 'update' output mode
query_kafka = windows_30sec_agg_kafka \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKERS) \
.option("topic", KAFKA_TOPIC_OUTPUT) \
.option("checkpointLocation","/opt/bitnami/spark/checkpointuu") \
.outputMode("append") \
.start()
# Write to console
#query_console = windows_5mins_agg_kafka \
# .writeStream \
# .format("console") \
# .outputMode("update") \
# .option("truncate", False) \
# .start()
query_kafka.awaitTermination()
#query_console.awaitTermination()
我希望将较小的 30 秒窗口的输出嵌套到更大的 5 分钟窗口中。但我什么都没有。
答: 暂无答案
评论