Pyspark 将键/值结构数组转换为单个结构体

Pyspark convert array of key/value structs into single struct

提问人:wkeithvan 提问时间:11/18/2023 更新时间:11/18/2023 访问量:39

问:

我有一列是任意长度的键/值结构数组:

StructType([
    StructField("key", StringType(), False),
    StructType([
        StructField("string_value", StringType(),  True),
        StructField("int_value",    IntegerType(), True),
        StructField("float_value",  FloatType(),   True),
        StructField("double_value", DoubleType(),  True)
    ])
])

我知道只有几个不同的键名以及它们的每种数据类型是什么。例如,name 始终是字符串,birth_year始终是整数等。并非每个属性都始终存在,因此预定义的结构必须具有所有可为 null 的值,例如:

StructType([
    StructField("first_name",  StringType(),  True),
    StructField("middle_name", StringType(),  True),
    StructField("last_name",   StringType(),  True),
    StructField("birth_year",  IntegerType(), True),
    StructField("ssn",         IntegerType(), True),
    StructField("zipcode",     IntegerType(), True),
])

我的传入列将如下所示:

[
    (key: "first_name", value: (string_type: "John")),
    (key: "ssn",        value: (int_type:    123456789)),
    (key: "last_name",  value: (string_type: "Doe")),
]
------------------------------------------------------
[
    (key: "ssn",        value: (int_type:    987654321)),
    (key: "last_name",  value: (string_type: "Jones")),
]
------------------------------------------------------
[
    (key: "zipcode",    value: (int_type:    13579)),
    (key: "first_name", value: (string_type: "Bob")),
    (key: "birth_year", value: (int_type:    1985)),
    (key: "last_name",  value: (string_type: "Smith")),
]

我希望它们成为这样的人结构的列,如下所示:

{
    first_name: "John",
    last_name:  "Doe",
    ssn:        123456789
}
------------------------------------------------------
{
    last_name:  "Jones",
    ssn:        987654321
}
------------------------------------------------------
{
    first_name: "Bob",
    last_name:  "Smith",
    birth_year: 1985,
    zipcode:    13579
}

这是一个 Playground 示例,但实际数据将有几十亿行,因此性能很重要,它不应该使用 Python UDF,而应该只使用 .pyspark.sql.functions

apache-spark pyspark apache-spark-sql

评论


答:

0赞 werner 11/18/2023 #1

对于所需结构的每个元素,filter 可用于从数组中提取期望值:

from pyspark.sql import functions as F

df = ...input data...

# a list of all possible struct entries in the input data
cfgs = [
    ("first_name", "string_type"),
    ("middle_name", "string_type"),
    ("last_name", "string_type"),
    ("birth_year", "int_type"),
    ("ssn", "int_type"),
    ("zipcode", "int_type")
]

cols = [            # for each element of the cfgs list
                    # take the element of the input array with the correct key
    (F.filter(F.col('person'), lambda c: c['key']==cfg[0])
      [0]           # take the first result (if any)
      ['value']     # take the value struct
      [cfg[1]])     # take the correct element of the the value struct
    .alias(cfg[0])  # rename the column
  for cfg in cfgs]

# combine the columns into a new struct
new_df = df.select(F.struct(cols).alias('person'))

结果:

+------------------------------------------+
|person                                    |
+------------------------------------------+
|{John, null, Doe, null, 123456789, null}  |
|{null, null, Jones, null, 987654321, null}|
|{Bob, null, Smith, 1985, null, 13579}     |
+------------------------------------------+

root
 |-- person: struct (nullable = false)
 |    |-- first_name: string (nullable = true)
 |    |-- middle_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- birth_year: long (nullable = true)
 |    |-- ssn: long (nullable = true)
 |    |-- zipcode: long (nullable = true)