排序合并连接策略仍然具有排序步骤,尽管在 pySpark 中对数据进行预排序

sort merge join strategy still have sort step though pre sort data in pyspark

提问人:nnqh 提问时间:11/12/2023 更新时间:11/12/2023 访问量:40

问:

data = [(1, "Alice", "A"),
        (3, "Charlie", "A"),
        (2, "Bob", "B"),
        (4, "David", "B")]
schema = ["id", "name", "partition_key"]

df = spark.createDataFrame(data, schema=schema)

df.repartition(2, f.col("id")).write.mode("overwrite")\
 .format("parquet") \
 .bucketBy(2, "id") \
 .sortBy("id") \
 .option("compression", "snappy") \
 .saveAsTable("bucketed_table_H")


spark.sql("""
select *
from bucketed_table_H a join bucketed_table_H b
on a.id = b.id
""").explain(True)

我看到计划:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#24L], [id#27L], Inner
   :- Sort [id#24L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#24L)
   :     +- FileScan parquet default.bucketed_table_h[id#24L,name#25,partition_key#26] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#24L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
   +- Sort [id#27L ASC NULLS FIRST], false, 0
      +- Filter isnotnull(id#27L)
         +- FileScan parquet default.bucketed_table_h[id#27L,name#28,partition_key#29] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file.., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2 

我为什么这还有排序步骤?

我尝试以下帖子: 当两个表的联接以相同的方式进行桶和排序时,为什么 Spark 会重新排序数据?

但我看到计划仍然有排序步骤

apache-spark 排序 加入 pyspark mergesort

评论

0赞 thebluephantom 11/12/2023
请遵循接受答案的协议。

答:

0赞 thebluephantom 11/12/2023 #1

计划中没有。这就是重点。Exchange

排序适用于 MergeSort。标准方法。查看 https://towardsdatascience.com/strategies-of-spark-join-c0e7b4572bcf

又名,它工作正常。

引用 SO 回答:“即使使用相同的分区器,也不清楚是否在每个分区中对行进行排序。