提问人:Aaron C 提问时间:11/15/2023 最后编辑:Aaron C 更新时间:11/19/2023 访问量:153
在熊猫、极地或火炬中有效迭代和应用功能?懒惰可能吗?
efficient iteration & application of a function in pandas, polars or torch? Is lazy possible?
问:
目标: 找到一种有效/最快的方法来逐列迭代表,并在 python 或使用 python 库在每列上运行函数。
背景: 我一直在探索提高功能速度的方法。这是因为我想运行两个模型/算法,一个小,一个大(使用火炬),大的很慢。我一直在使用小的进行测试。小模型是每列的季节性分解。
设置:
Testing environment: ec2, t2 large. X86_64
Python version: 3.11.5
Polars: 0.19.13
pandas: 2.1.1
numpy: 1.26.0
Pandas/Polars 中的演示数据:
rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.
熊猫
熊猫和字典:
from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd
class pdDictTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {}
for column in dataframe.columns:
trend_data_dict[column] = cls().process_col(dataframe[column])
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序在 14.349091062998923 中执行
使用列表推导而不是 for 循环:
class pdDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
程序在 14.343959668000025 中执行
使用熊猫和火炬的课程:
from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd
class pdTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序执行于 23.14214362200073
使用字典、多处理和列表推导: 正如下面的@roganjosh和@jqurious所建议的那样。
from multiprocessing import Pool
class pdMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pd.DataFrame(trend_data_dict, index=dataframe.index)
程序在 4.582350738997775 中执行,又好又快。
极地
极地和火炬:
class plTorTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend[np.isnan(result.trend)] = 0
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
程序执行于 13.813817326999924
Polars & lamdba:
start = timeit.default_timer()
df_p = df_p.select([
pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
程序执行于 82.5596211330012
我怀疑这写得很差,也是它这么慢的原因。我还没有找到更好的方法。
到目前为止,我已经尝试过、apply_many、应用、地图、map_batches或map_elements。with_columns vs select 和其他一些组合。
仅限极地,for 循环:
class plTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
# Handle missing values by replacing NaN with 0
result.trend[np.isnan(result.trend)] = 0
return pl.DataFrame({column_data.name: result.trend})
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_dataframes = pl.DataFrame()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = trend_dataframes.hstack(trend_data)
return trend_dataframes
程序在 13.34212675299932 中执行
使用列表推导式:
我尝试了极坐标和列表理解。但是在极性语法方面有困难。
使用 dict & for 循环:
程序执行于 13.743039597999996
使用字典和列表推导:
class plDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pl.DataFrame(trend_data_dict)
return trend_dataframes
程序执行于 13.008102383002552
使用字典、多处理和列表推导: 正如下面的@roganjosh和@jqurious所建议的那样。
from multiprocessing import Pool
class plMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pl.DataFrame(trend_data_dict)
程序在 4.997288776001369 中执行,不错!。
使用 lazyFrame?
我可以在上面的方法中添加懒惰和收集,但这样做并不能改善时间。其中一个关键问题似乎是传递给惰性操作的函数也需要惰性。我希望它可以并行运行每一列。df_p.select()
当前结论和说明
- 对于某些运行,我有一秒到半秒的变化。
- 熊猫和字典,似乎是合理的。如果您关心索引,那么这可能是一个不错的选择。
- 具有字典和列表理解的极点是“最快的”。但相差不大。考虑到变化甚至更小的差异。
- 这两种选择还具有不需要额外软件包的好处。
- 极地似乎还有改进的余地。就更好的代码而言,但不确定这是否会大大缩短时间。作为主要计算时间,计算时间seasonal_decompose。如果单独运行,每列需要 ~0.012 秒。
- 对任何改进反馈持开放态度
- 警告:我尚未对上述函数进行完整的输出验证。
- 变量从process_col返回的方式对速度影响不大。不出所料,也是我在这里调整的一部分。例如,对于极坐标,如果我返回 numpy 数组,我的时间会变慢。如果我返回一个 numpy 数组,但声明 -> pl.series,这似乎与速度大致相同,一两次试验更快(然后在上面)。
反馈后/添加多处理
- 惊喜惊喜,多处理取胜。这似乎与大熊猫或极地无关。
答:
对于 Polars,在这种情况下使用 和 是一种“反模式”。.select()
.map_batches()
您将所有数据都通过 Polars 表达式引擎,将其传回 Python 以运行您的外部函数,然后再次将其传回 Polars。
您可以绕过它,只需直接传递每个列(类似于在 Pandas 方法中遍历每个列的方式):Series
seasonal_decompose()
pl.DataFrame({
col.name: seasonal_decompose(col, model="Additive", period=365).trend
for col in df_p
})
不过,我确实注意到的一件事是,如果您从每列创建一个 LazyFrame 并使用 pl.collect_all(),
它会将方法速度提高 ~50%。(也许可以对此进行调查。.map_batches()
(虽然仍然比理解稍慢。
lf = df_p.lazy()
lazy_columns = [
lf.select(pl.col(col).map_batches(
lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
)
for col in lf.columns
]
out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")
从本质上讲,问题变成了“我怎样才能并行化一个 Python for 循环?
正如@roganjosh所指出的,这是通过多处理完成的。
from multiprocessing import get_context
...
if __name__ == "__main__":
df_p = ...
with get_context("spawn").Pool() as pool:
columns = pool.map(process_column, (col for col in df_p))
出于兴趣,与常规理解相比,该示例在多处理的情况下运行速度提高了 ~50%。
但它非常特定于任务/数据/平台,因此您可以在本地对其进行基准测试。
评论
pl.collect_all()
.select()
.map_batches()
pl.Series
评论
multiprocessing
polars