提问人:BC Smith 提问时间:9/12/2020 最后编辑:BC Smith 更新时间:9/15/2020 访问量:660
带有 groupby() 的 pyspark agg() 无法使用 pandas_udf
pyspark agg() with groupby() is not working using pandas_udf
问:
当我使用 min、sum、max、count、mean 或 std 之一时,pyspark groupby().agg() 工作正常。但是如果我 提供median_udf、range_udf或quantile_udf用户定义函数中的任何一个,然后 pyspark groupby().agg() 不起作用(AttributeError: 'function' object has no attribute '_get_object_id')。
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType
spark = SparkSession.builder.getOrCreate()
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(x):
return np.median(x)
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def range_udf(x):
return float(x.max() - x.min())
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def quantile_udf(x):
return x.quantile(.50)
df = spark.createDataFrame(
[
('2017-01-01 1:00:00', 1000, 'IC', 280, 11),
('2017-03-15 8:15:00', 1000, 'ICE', 280, 2),
('2019-03-27 5:00:00', 1002, 'IC', 260, 3),
('2019-04-01 11:00:00', 1002, 'IC', 220, 9),
('2019-08-07 1:00:00', 1000, 'ICE', 270, 5),
('2020-02-22 11:00:00', 1000, 'IC', 280, 3),
('2020-02-22 11:00:00', 1000, 'IC', 280, 55),
('2020-02-22 11:00:00', 1002, 'ICE', 280, 8),
('2020-04-08 9:00:00', 1000, 'IC', 220, 4),
],
['Date', 'TrainID', 'Traintype', 'Max_Speed', 'Delay']
)
df = df.withColumn("Date", df['Date'].cast(DateType()))
columns = ['Date', 'TrainID', 'Traintype']
# agg_func working - min, sum, max, count, mean, std
# this is working
agg_func = {'Max_Speed': 'min', 'Delay': 'max'}
# this is not working
# agg_func = {'Max_Speed': 'min', 'Delay': range_udf}
df = df.groupby(columns).agg(agg_func)
print('Pyspark group by result')
df.show()
我的pandas_udf功能有错误吗?我必须改变的地方才能使其工作。
答: 暂无答案
评论