Pyspark - 查找嵌套集合中最早的日期

Pyspark - find the oldest date in a nested collection

提问人:Smaillns 提问时间:4/6/2022 最后编辑:Vincent DobaSmaillns 更新时间:4/6/2022 访问量:280

问:

我有以下数据帧

 root
     |-- AUTHOR_ID: integer (nullable = false)
     |-- Books: array (nullable = true) 
     |    |-- element: struct (containsNull = true)
     |    |    |-- NAME: string (nullable = true)
     |    |    |-- DATE: TimestampType (nullable = true)

如何找到每个作者最早出版的书?我想检索日期

{
 "AUTHOR_ID": 1,
 "FIRST_PUBLICATION": <Date>
 "Books": "[ ... ]"
}
apache-spark pyspark 日期比较

评论

0赞 Dipanjan Mallick 4/6/2022
您能否共享示例数据以及预期输出?
0赞 Smaillns 4/6/2022
@wwnde 我只是给出了自己的答案(见下文),非常感谢

答:

1赞 wwnde 4/6/2022 #1

多种方法,让我们尝试窗口函数

root
 |-- AUTHOR_ID: integer (nullable = false)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: date (nullable = true)

+---------+--------------------------------+
|AUTHOR_ID|Books                           |
+---------+--------------------------------+
|21       |[{Stories of Mary, 2019-12-01}] |
|34       |[{Sorrows of Mary, 2019-09-01}] |
|34       |[{Sparrows of Mary, 2019-06-16}]|
|21       |[{Songs of Mary, 2017-03-14}]   |
+---------+--------------------------------+

跟随您的编辑

win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
  
     df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
     
     .withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
     
    ).select('value').show(truncate=False)


+-------------------------------------------------------------------------------------------------------------+
|value                                                                                                        |
+-------------------------------------------------------------------------------------------------------------+
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]}   |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
+-------------------------------------------------------------------------------------------------------------+
0赞 Smaillns 4/6/2022 #2

对于使用是最好的解决方案,Spark v3Spark Higher-order functions

df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
                                       Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
                               'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')

df.show(truncate=False)

+---------+-----------------------------------------------------+
|AUTHOR_ID|Books                                                |
+---------+-----------------------------------------------------+
|1        |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
+---------+-----------------------------------------------------+

df.printSchema()

root
 |-- AUTHOR_ID: string (nullable = true)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: timestamp (nullable = true)

我们可以得到每个作者日期最短的书,如下所示

    df = df.withColumn('FIRST_PUBLICATION',
                  f.aggregate(
                      'Books',
                      f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
                      lambda acc, b : f.least(acc, b['DATE'])
                  )
           )

结果

# df.show()
+---------+--------------------+-------------------+
|AUTHOR_ID|               Books|  FIRST_PUBLICATION|
+---------+--------------------+-------------------+
|        1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
+---------+--------------------+-------------------+
0赞 Vincent Doba 4/6/2022 #3

从 Spark 2.4 开始,可以使用 array_min 函数来检索数组的最小元素。将此函数应用于仅包含日期的数组。若要生成仅包含日期的数组,可以在列上使用 getField 方法。Books

以下是完整的代码:

from pyspark.sql import functions as F

df = df.withColumn('FIRST_PUBLICATION', F.array_min(F.col('Books').getField('DATE')))