提问人:Tobi 提问时间:10/16/2023 最后编辑:Tobi 更新时间:10/16/2023 访问量:21
连接两个 PySpark 数据帧的最有效方法,根据条件透视和填充 NULL
Most efficient way to join two PySpark dataframes, pivot and fill NULL based on condition
问:
我有两个长格式的大型 PySpark 数据帧。 最终表格应采用宽格式。 我不知道什么是最好的方法。
感谢您的支持。
from pyspark.sql import Row
from pyspark.sql import functions as F
data1 = [Row(Component='b1',Subcomponent='a11',Class=1),
Row(Component='b1',Subcomponent='a12',Class=1),
Row(Component='c1',Subcomponent='b1',Class=2),
Row(Component='b2',Subcomponent='a21',Class=1),
Row(Component='b2',Subcomponent='a22',Class=1),
Row(Component='c2',Subcomponent='b2',Class=2)]
df1 = spark.createDataFrame(data1)
df1.show()
data2 = [Row(Part='a11',Parameter='X_01',Value=1101),
Row(Part='a11',Parameter='X_02',Value=1102),
Row(Part='a12',Parameter='X_01',Value=1201),
Row(Part='a12',Parameter='X_02',Value=1202),
Row(Part='b1',Parameter='Y',Value=1),
Row(Part='c1',Parameter='Z',Value=10),
Row(Part='a21',Parameter='X_01',Value=2101),
Row(Part='a21',Parameter='X_02',Value=2102),
Row(Part='a22',Parameter='X_01',Value=2201),
Row(Part='a22',Parameter='X_02',Value=2202),
Row(Part='b2',Parameter='Y',Value=2),
Row(Part='c2',Parameter='Z',Value=20)]
df2 = spark.createDataFrame(data2)
df2.show()
+---------+------------+
|Component|Subcomponent|
+---------+------------+
| b1| a11|
| b1| a12|
| c1| b1|
| b2| a21|
| b2| a22|
| c2| b2|
+---------+------------+
+----+---------+-----+
|Part|Parameter|Value|
+----+---------+-----+
| a11| X_01| 1101|
| a11| X_02| 1102|
| a12| X_01| 1201|
| a12| X_02| 1202|
| b1| Y| 1|
| c1| Z| 10|
| a21| X_01| 2101|
| a21| X_02| 2102|
| a22| X_01| 2201|
| a22| X_02| 2202|
| b2| Y| 2|
| c2| Z| 20|
+----+---------+-----+
结果表应如下所示:
+---------+--------------------------------------+---------+---------+---------+
|Component| X_01_mean|X_02_mean| Y| Z|
+---------+--------------------------------------+---------+---------+---------+
| b1| 1151 (1101+1201)| 1152| 1| Null|
| b2| 2151| 2152| 2| Null|
| c1|1151 (fill out since b1 is part of c1)| 1152| 1| 10|
| c2| 2151| 2152| 2| 20|
+---------+--------------------------------------+---------+---------+---------+
到目前为止,我的代码如下所示:
# join both tables
df12 = (df1.alias('a')
.join(df2.alias('b'),
on=F.col('a.Subcomponent') == F.col('b.Part'),
how='left')
.drop(F.col('b.Component'))
)
# pivot and aggregate class == 1
df12_class1 = (df12
.where(F.col('Class')==1)
.groupby('Component')
.pivot('Parameter')
.agg(F.mean('Value').alias('mean'))
)
# pivot class == 2
df12_class2 = (df12
.where(F.col('Class')==2)
.groupby('Component')
.pivot('Parameter')
.agg(F.first('Value'))
)
# join class1
df12_join = (df12.alias('a')
.select(['Component','Subcomponent'])
.dropDuplicates(['Component'])
.join(df12_class1.alias('b'),
on=F.col('a.Component') == F.col('b.Component'),
how='left')
.drop(F.col('b.Component'))
)
# join class2
df12_join = (df12_join.alias('a')
.join(df12_class2.alias('b'),
on=F.col('a.Component') == F.col('b.Component'),
how='left')
.drop(F.col('b.Component'))
)
df12_join.show()
到目前为止,该表如下所示。
+---------+------------+---------+---------+----+
|Component|Subcomponent|X_01_mean|X_02_mean| Y|
+---------+------------+---------+---------+----+
| b1| a11| 1151| 1152|NULL|
| b2| a21| 2151| 2152|NULL|
| c1| b1| NULL| NULL| 1|
| c2| b2| NULL| NULL|NULL|
+---------+------------+---------+---------+----+
仍然无法弄清楚如何填充 NULL 以及如何添加 Parameter='Z'。 有没有更好的方法来聚合和透视,就像我到目前为止所做的那样?
答: 暂无答案
评论