嵌套滑动窗口和追加模式

Nested Sliding Windows and append mode

提问人:Oscar Legat 提问时间:10/17/2023 最后编辑:Oscar Legat 更新时间:10/17/2023 访问量:27

问:

我在 pyspark 中有一个嵌套的滑动窗口,在 outputMode = “append” 中有一个 kafka sink。只有滑动窗口的第一个 groupBy,我有一个与我最初的 5 分钟大小相匹配的窗口分组输出。但是当我添加嵌套的滑动窗口时,我在队列中没有得到任何发射。我的代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, stddev, collect_list, max as max_, to_json, struct
from pyspark.sql.types import IntegerType, FloatType, StringType, LongType, StructType, StructField
import pyspark.sql.functions as F

from pyspark.sql.window import Window



spark = SparkSession.builder \
    .appName("Sliding Window with Kafka Sink") \
    .config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false") \
    .getOrCreate()



spark.sparkContext.setLogLevel("WARN")

# Define the schema
schema = StructType([
    StructField("ukLongStamp", LongType()),
    StructField("longStamp", LongType()),
    StructField("id_tarea", IntegerType()),
    StructField("fecha", StringType()),
    StructField("tiempo_ejecucion_ms", FloatType()),
    StructField("proceso_principal", StringType()),
    StructField("subproceso", StringType())
])

KAFKA_BROKERS = "172.17.0.1:29092"
KAFKA_TOPIC_INPUT = "transactions-data-process"
KAFKA_TOPIC_OUTPUT = "transactions-data-result"

# Read the data
data_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKERS) \
    .option("startingOffsets", "latest") \
    .option("subscribe", KAFKA_TOPIC_INPUT) \
    .option('failOnDataLoss', 'false') \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("data"), col("timestamp")) \
    .select("data.*", "timestamp")


windows_5min_agg = data_stream \
    .withColumn("proceso_principal", col("proceso_principal")) \
    .withColumn("subproceso", col("subproceso")) \
    .withWatermark("timestamp", "40 seconds") \
    .groupBy(window(col("timestamp"), "5 minutes", \
    "1 minutes 40 seconds").alias("windows_5min_agg"),"proceso_principal","subproceso") \
    .agg( \
        collect_list(struct("tiempo_ejecucion_ms", "timestamp")).alias("struct_list")
    )



windows_5min_agg = windows_5min_agg.withColumn("windows_5min_agg_start", col("windows_5min_agg.start").cast('timestamp')).withColumn("windows_5min_agg_end", col("windows_5min_agg.end").cast('timestamp'))


exploded_5min = windows_5min_agg.selectExpr("proceso_principal","subproceso", \
                            "windows_5min_agg_start","windows_5min_agg_end", \
                            "explode(struct_list) as struct_values")

exploded_5min = exploded_5min.selectExpr("proceso_principal","subproceso", \
                            "windows_5min_agg_start","windows_5min_agg_end", \
                            "struct_values.tiempo_ejecucion_ms as tiempo_ejecucion_ms", \
                            "struct_values.timestamp as timestamp")


# 30-second window operations
windows_30sec_agg = exploded_5min \
    .withColumn("proceso_principal", col("proceso_principal")) \
    .withColumn("subproceso", col("subproceso")) \
    .withWatermark("timestamp", "8 minutes") \
    .groupBy(window(col("timestamp"), "30 seconds", \
    "25 seconds").alias("windows_30sec_agg"),"windows_5min_agg_start", "windows_5min_agg_end","proceso_principal","subproceso") \
    .agg(stddev("tiempo_ejecucion_ms").alias("stddev_30sec_agg"), \
    F.max("timestamp").alias("timestamp_max"))

 

windows_30sec_agg = windows_30sec_agg.withColumn("windows_30sec_agg_start", col("windows_30sec_agg.start").cast('timestamp')).withColumn("windows_30sec_agg_end", col("windows_30sec_agg.end").cast('timestamp'))


windows_30sec_agg_kafka = windows_30sec_agg \
    .selectExpr("windows_30sec_agg_start","timestamp_max", "to_json(struct(*)) AS value")

#windows_5mins_agg_kafka = windows_5mins_agg_kafka.withWatermark("timestamp_max", "2 minutes")



#Write to Kafka with 'update' output mode
query_kafka = windows_30sec_agg_kafka \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKERS) \
    .option("topic", KAFKA_TOPIC_OUTPUT) \
    .option("checkpointLocation","/opt/bitnami/spark/checkpointuu") \
    .outputMode("append") \
    .start()

# Write to console
#query_console = windows_5mins_agg_kafka \
#    .writeStream \
#    .format("console") \
#    .outputMode("update") \
#    .option("truncate", False) \
#    .start()

query_kafka.awaitTermination()
#query_console.awaitTermination()

 

我希望将较小的 30 秒窗口的输出嵌套到更大的 5 分钟窗口中。但我什么都没有。

Windows apache-kafka 嵌套 追加

评论


答: 暂无答案