提问人:nnqh 提问时间:11/12/2023 更新时间:11/12/2023 访问量:40
排序合并连接策略仍然具有排序步骤,尽管在 pySpark 中对数据进行预排序
sort merge join strategy still have sort step though pre sort data in pyspark
问:
data = [(1, "Alice", "A"),
(3, "Charlie", "A"),
(2, "Bob", "B"),
(4, "David", "B")]
schema = ["id", "name", "partition_key"]
df = spark.createDataFrame(data, schema=schema)
df.repartition(2, f.col("id")).write.mode("overwrite")\
.format("parquet") \
.bucketBy(2, "id") \
.sortBy("id") \
.option("compression", "snappy") \
.saveAsTable("bucketed_table_H")
spark.sql("""
select *
from bucketed_table_H a join bucketed_table_H b
on a.id = b.id
""").explain(True)
我看到计划:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#24L], [id#27L], Inner
:- Sort [id#24L ASC NULLS FIRST], false, 0
: +- Filter isnotnull(id#24L)
: +- FileScan parquet default.bucketed_table_h[id#24L,name#25,partition_key#26] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#24L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
+- Sort [id#27L ASC NULLS FIRST], false, 0
+- Filter isnotnull(id#27L)
+- FileScan parquet default.bucketed_table_h[id#27L,name#28,partition_key#29] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file.., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
我为什么这还有排序步骤?
我尝试以下帖子: 当两个表的联接以相同的方式进行桶和排序时,为什么 Spark 会重新排序数据?
但我看到计划仍然有排序步骤
答:
0赞
thebluephantom
11/12/2023
#1
计划中没有。这就是重点。Exchange
排序适用于 MergeSort。标准方法。查看 https://towardsdatascience.com/strategies-of-spark-join-c0e7b4572bcf
又名,它工作正常。
引用 SO 回答:“即使使用相同的分区器,也不清楚是否在每个分区中对行进行排序。
评论