连接两个 PySpark 数据帧的最有效方法,根据条件透视和填充 NULL

Most efficient way to join two PySpark dataframes, pivot and fill NULL based on condition

提问人:Tobi 提问时间:10/16/2023 最后编辑:Tobi 更新时间:10/16/2023 访问量:21

问:

我有两个长格式的大型 PySpark 数据帧。 最终表格应采用宽格式。 我不知道什么是最好的方法。

感谢您的支持。

from pyspark.sql import Row
from pyspark.sql import functions as F

data1 = [Row(Component='b1',Subcomponent='a11',Class=1),
         Row(Component='b1',Subcomponent='a12',Class=1),
         Row(Component='c1',Subcomponent='b1',Class=2),
         Row(Component='b2',Subcomponent='a21',Class=1),
         Row(Component='b2',Subcomponent='a22',Class=1),
         Row(Component='c2',Subcomponent='b2',Class=2)]
df1 = spark.createDataFrame(data1)
df1.show()

data2 = [Row(Part='a11',Parameter='X_01',Value=1101),
         Row(Part='a11',Parameter='X_02',Value=1102),
         Row(Part='a12',Parameter='X_01',Value=1201),
         Row(Part='a12',Parameter='X_02',Value=1202),
         Row(Part='b1',Parameter='Y',Value=1),
         Row(Part='c1',Parameter='Z',Value=10),
         Row(Part='a21',Parameter='X_01',Value=2101),
         Row(Part='a21',Parameter='X_02',Value=2102),
         Row(Part='a22',Parameter='X_01',Value=2201),
         Row(Part='a22',Parameter='X_02',Value=2202),
         Row(Part='b2',Parameter='Y',Value=2),
         Row(Part='c2',Parameter='Z',Value=20)]
df2 = spark.createDataFrame(data2)
df2.show()
+---------+------------+
|Component|Subcomponent|
+---------+------------+
|       b1|         a11|
|       b1|         a12|
|       c1|          b1|
|       b2|         a21|
|       b2|         a22|
|       c2|          b2|
+---------+------------+

+----+---------+-----+
|Part|Parameter|Value|
+----+---------+-----+
| a11|     X_01| 1101|
| a11|     X_02| 1102|
| a12|     X_01| 1201|
| a12|     X_02| 1202|
|  b1|        Y|    1|
|  c1|        Z|   10|
| a21|     X_01| 2101|
| a21|     X_02| 2102|
| a22|     X_01| 2201|
| a22|     X_02| 2202|
|  b2|        Y|    2|
|  c2|        Z|   20|
+----+---------+-----+

结果表应如下所示:

+---------+--------------------------------------+---------+---------+---------+
|Component|                             X_01_mean|X_02_mean|        Y|        Z|
+---------+--------------------------------------+---------+---------+---------+
|       b1|                      1151 (1101+1201)|     1152|        1|     Null|
|       b2|                                  2151|     2152|        2|     Null|
|       c1|1151 (fill out since b1 is part of c1)|     1152|        1|       10|
|       c2|                                  2151|     2152|        2|       20|
+---------+--------------------------------------+---------+---------+---------+

到目前为止,我的代码如下所示:

# join both tables
df12 = (df1.alias('a')
  .join(df2.alias('b'),
    on=F.col('a.Subcomponent') == F.col('b.Part'),
    how='left')
  .drop(F.col('b.Component'))
)

# pivot and aggregate class == 1
df12_class1 = (df12
  .where(F.col('Class')==1)
  .groupby('Component')
  .pivot('Parameter')
  .agg(F.mean('Value').alias('mean'))
)

# pivot class == 2
df12_class2 = (df12
  .where(F.col('Class')==2)
  .groupby('Component')
  .pivot('Parameter')
  .agg(F.first('Value'))
)

# join class1
df12_join = (df12.alias('a')
  .select(['Component','Subcomponent'])
  .dropDuplicates(['Component'])
  .join(df12_class1.alias('b'),
    on=F.col('a.Component') == F.col('b.Component'),
    how='left')
  .drop(F.col('b.Component'))
)

# join class2
df12_join = (df12_join.alias('a')
  .join(df12_class2.alias('b'),
    on=F.col('a.Component') == F.col('b.Component'),
    how='left')
  .drop(F.col('b.Component'))
)
df12_join.show()

到目前为止,该表如下所示。

+---------+------------+---------+---------+----+
|Component|Subcomponent|X_01_mean|X_02_mean|   Y|
+---------+------------+---------+---------+----+
|       b1|         a11|     1151|     1152|NULL|
|       b2|         a21|     2151|     2152|NULL|
|       c1|          b1|     NULL|     NULL|   1|
|       c2|          b2|     NULL|     NULL|NULL|
+---------+------------+---------+---------+----+

仍然无法弄清楚如何填充 NULL 以及如何添加 Parameter='Z'。 有没有更好的方法来聚合和透视,就像我到目前为止所做的那样?

加入 PySpark Pivot Coalesce

评论


答: 暂无答案