提问人:wkeithvan 提问时间:11/18/2023 更新时间:11/18/2023 访问量:39
Pyspark 将键/值结构数组转换为单个结构体
Pyspark convert array of key/value structs into single struct
问:
我有一列是任意长度的键/值结构数组:
StructType([
StructField("key", StringType(), False),
StructType([
StructField("string_value", StringType(), True),
StructField("int_value", IntegerType(), True),
StructField("float_value", FloatType(), True),
StructField("double_value", DoubleType(), True)
])
])
我知道只有几个不同的键名以及它们的每种数据类型是什么。例如,name 始终是字符串,birth_year始终是整数等。并非每个属性都始终存在,因此预定义的结构必须具有所有可为 null 的值,例如:
StructType([
StructField("first_name", StringType(), True),
StructField("middle_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("birth_year", IntegerType(), True),
StructField("ssn", IntegerType(), True),
StructField("zipcode", IntegerType(), True),
])
我的传入列将如下所示:
[
(key: "first_name", value: (string_type: "John")),
(key: "ssn", value: (int_type: 123456789)),
(key: "last_name", value: (string_type: "Doe")),
]
------------------------------------------------------
[
(key: "ssn", value: (int_type: 987654321)),
(key: "last_name", value: (string_type: "Jones")),
]
------------------------------------------------------
[
(key: "zipcode", value: (int_type: 13579)),
(key: "first_name", value: (string_type: "Bob")),
(key: "birth_year", value: (int_type: 1985)),
(key: "last_name", value: (string_type: "Smith")),
]
我希望它们成为这样的人结构的列,如下所示:
{
first_name: "John",
last_name: "Doe",
ssn: 123456789
}
------------------------------------------------------
{
last_name: "Jones",
ssn: 987654321
}
------------------------------------------------------
{
first_name: "Bob",
last_name: "Smith",
birth_year: 1985,
zipcode: 13579
}
这是一个 Playground 示例,但实际数据将有几十亿行,因此性能很重要,它不应该使用 Python UDF,而应该只使用 .pyspark.sql.functions
答:
0赞
werner
11/18/2023
#1
对于所需结构的每个元素,filter 可用于从数组中提取期望值:
from pyspark.sql import functions as F
df = ...input data...
# a list of all possible struct entries in the input data
cfgs = [
("first_name", "string_type"),
("middle_name", "string_type"),
("last_name", "string_type"),
("birth_year", "int_type"),
("ssn", "int_type"),
("zipcode", "int_type")
]
cols = [ # for each element of the cfgs list
# take the element of the input array with the correct key
(F.filter(F.col('person'), lambda c: c['key']==cfg[0])
[0] # take the first result (if any)
['value'] # take the value struct
[cfg[1]]) # take the correct element of the the value struct
.alias(cfg[0]) # rename the column
for cfg in cfgs]
# combine the columns into a new struct
new_df = df.select(F.struct(cols).alias('person'))
结果:
+------------------------------------------+
|person |
+------------------------------------------+
|{John, null, Doe, null, 123456789, null} |
|{null, null, Jones, null, 987654321, null}|
|{Bob, null, Smith, 1985, null, 13579} |
+------------------------------------------+
root
|-- person: struct (nullable = false)
| |-- first_name: string (nullable = true)
| |-- middle_name: string (nullable = true)
| |-- last_name: string (nullable = true)
| |-- birth_year: long (nullable = true)
| |-- ssn: long (nullable = true)
| |-- zipcode: long (nullable = true)
评论