提问人:Smaillns 提问时间:4/6/2022 最后编辑:Vincent DobaSmaillns 更新时间:4/6/2022 访问量:280
Pyspark - 查找嵌套集合中最早的日期
Pyspark - find the oldest date in a nested collection
问:
我有以下数据帧
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: TimestampType (nullable = true)
如何找到每个作者最早出版的书?我想检索日期
{
"AUTHOR_ID": 1,
"FIRST_PUBLICATION": <Date>
"Books": "[ ... ]"
}
答:
1赞
wwnde
4/6/2022
#1
多种方法,让我们尝试窗口函数
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: date (nullable = true)
+---------+--------------------------------+
|AUTHOR_ID|Books |
+---------+--------------------------------+
|21 |[{Stories of Mary, 2019-12-01}] |
|34 |[{Sorrows of Mary, 2019-09-01}] |
|34 |[{Sparrows of Mary, 2019-06-16}]|
|21 |[{Songs of Mary, 2017-03-14}] |
+---------+--------------------------------+
跟随您的编辑
win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
.withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
).select('value').show(truncate=False)
+-------------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------------+
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]} |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
+-------------------------------------------------------------------------------------------------------------+
0赞
Smaillns
4/6/2022
#2
对于使用是最好的解决方案,Spark v3
Spark Higher-order functions
df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')
df.show(truncate=False)
+---------+-----------------------------------------------------+
|AUTHOR_ID|Books |
+---------+-----------------------------------------------------+
|1 |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
+---------+-----------------------------------------------------+
df.printSchema()
root
|-- AUTHOR_ID: string (nullable = true)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: timestamp (nullable = true)
我们可以得到每个作者日期最短的书,如下所示
df = df.withColumn('FIRST_PUBLICATION',
f.aggregate(
'Books',
f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
lambda acc, b : f.least(acc, b['DATE'])
)
)
结果
# df.show()
+---------+--------------------+-------------------+
|AUTHOR_ID| Books| FIRST_PUBLICATION|
+---------+--------------------+-------------------+
| 1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
+---------+--------------------+-------------------+
0赞
Vincent Doba
4/6/2022
#3
从 Spark 2.4 开始,可以使用 array_min
函数来检索数组的最小元素。将此函数应用于仅包含日期的数组。若要生成仅包含日期的数组,可以在列上使用 getField
方法。Books
以下是完整的代码:
from pyspark.sql import functions as F
df = df.withColumn('FIRST_PUBLICATION', F.array_min(F.col('Books').getField('DATE')))
评论