在熊猫、极地或火炬中有效迭代和应用功能?懒惰可能吗?

efficient iteration & application of a function in pandas, polars or torch? Is lazy possible?

提问人:Aaron C 提问时间:11/15/2023 最后编辑:Aaron C 更新时间:11/19/2023 访问量:153

问:

目标: 找到一种有效/最快的方法来逐列迭代表,并在 python 或使用 python 库在每列上运行函数。

背景: 我一直在探索提高功能速度的方法。这是因为我想运行两个模型/算法,一个小,一个大(使用火炬),大的很慢。我一直在使用小的进行测试。小模型是每列的季节性分解。

设置

Testing environment: ec2, t2 large. X86_64
Python version: 3.11.5 
Polars: 0.19.13 
pandas: 2.1.1 
numpy: 1.26.0

Pandas/Polars 中的演示数据:

rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.

熊猫

熊猫和字典:

from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd

class pdDictTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> pd.DataFrame:
        trend_data_dict = {}
        for column in dataframe.columns:
                trend_data_dict[column] = cls().process_col(dataframe[column])
        trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
        return trend_dataframes

import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序在 14.349091062998923 中执行

使用列表推导而不是 for 循环:

class pdDict2TrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> pd.Series:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> pd.DataFrame:
        trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
        trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
        return trend_dataframes

程序在 14.343959668000025 中执行

使用熊猫和火炬的课程:

from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd

class pdTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return torch.tensor(trend, dtype=torch.float32).view(-1, 1)

    @classmethod
    def process_df(cls,  dataframe: pd.DataFrame) -> torch.Tensor:
        trend_dataframes = torch.Tensor()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
        return trend_dataframes


start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序执行于 23.14214362200073

使用字典、多处理和列表推导: 正如下面的@roganjosh和@jqurious所建议的那样。

from multiprocessing import Pool

class pdMTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pd.Series = None) -> pd.Series:
        result = seasonal_decompose(column_data, model=self._model, period=self._period)
        trend = result.trend.fillna(0).values
        return trend

    @classmethod
    def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
        with Pool() as pool:
            trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))

        return pd.DataFrame(trend_data_dict, index=dataframe.index)

程序在 4.582350738997775 中执行,又好又快。

极地

极地和火炬:

class plTorTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pl.Series = None) -> torch.Tensor:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        trend = result.trend[np.isnan(result.trend)] = 0
        return torch.tensor(trend, dtype=torch.float32).view(-1, 1)

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> torch.Tensor:
        trend_dataframes = torch.Tensor()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
        return trend_dataframes

程序执行于 13.813817326999924

Polars & lamdba:

start = timeit.default_timer()

df_p = df_p.select([
    pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start

print("Program Executed in "+str(execution_time))

程序执行于 82.5596211330012

我怀疑这写得很差,也是它这么慢的原因。我还没有找到更好的方法。

到目前为止,我已经尝试过、apply_many、应用、地图、map_batches或map_elements。with_columns vs select 和其他一些组合。

仅限极地,for 循环:

class plTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
         # Store data as an instance variable
        
    def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        
        # Handle missing values by replacing NaN with 0
        result.trend[np.isnan(result.trend)] = 0
        return pl.DataFrame({column_data.name: result.trend})

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> pl.DataFrame:
        trend_dataframes = pl.DataFrame()
        for column in dataframe.columns:
            trend_data = cls().process_col(dataframe[column])
            trend_dataframes = trend_dataframes.hstack(trend_data)
        return trend_dataframes

程序在 13.34212675299932 中执行

使用列表推导式:

我尝试了极坐标和列表理解。但是在极性语法方面有困难。

使用 dict & for 循环:

程序执行于 13.743039597999996

使用字典和列表推导:

class plDict2TrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pl.Series = None) ->  pl.Series:
        self.data = column_data 
        result = seasonal_decompose(self.data, model=self._model, period=self._period)
        result.trend[np.isnan(result.trend)] = 0
        return pl.Series(result.trend)

    @classmethod
    def process_df(cls,  dataframe: pl.DataFrame) -> pl.DataFrame:
        trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
        trend_dataframes = pl.DataFrame(trend_data_dict)
        return trend_dataframes

程序执行于 13.008102383002552

使用字典、多处理和列表推导: 正如下面的@roganjosh和@jqurious所建议的那样。

from multiprocessing import Pool

class plMTrendExtractor:
        
    def __init__(self, period: int = 365) -> None:
        self._period = period
        self._model = 'Additive'
        
    def process_col(self, column_data: pl.Series = None) -> pl.Series:
        result = seasonal_decompose(column_data, model=self._model, period=self._period)
        result.trend[np.isnan(result.trend)] = 0
        return pl.Series(result.trend)

    @classmethod
    def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
        with Pool() as pool:
            trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))

        return pl.DataFrame(trend_data_dict)

程序在 4.997288776001369 中执行,不错!。

使用 lazyFrame?

我可以在上面的方法中添加懒惰和收集,但这样做并不能改善时间。其中一个关键问题似乎是传递给惰性操作的函数也需要惰性。我希望它可以并行运行每一列。df_p.select()

当前结论和说明

  • 对于某些运行,我有一秒到半秒的变化。
  • 熊猫和字典,似乎是合理的。如果您关心索引,那么这可能是一个不错的选择。
  • 具有字典和列表理解的极点是“最快的”。但相差不大。考虑到变化甚至更小的差异。
  • 这两种选择还具有不需要额外软件包的好处。
  • 极地似乎还有改进的余地。就更好的代码而言,但不确定这是否会大大缩短时间。作为主要计算时间,计算时间seasonal_decompose。如果单独运行,每列需要 ~0.012 秒。
  • 对任何改进反馈持开放态度
  • 警告:我尚未对上述函数进行完整的输出验证。
  • 变量从process_col返回的方式对速度影响不大。不出所料,也是我在这里调整的一部分。例如,对于极坐标,如果我返回 numpy 数组,我的时间会变慢。如果我返回一个 numpy 数组,但声明 -> pl.series,这似乎与速度大致相同,一两次试验更快(然后在上面)。

反馈后/添加多处理

  • 惊喜惊喜,多处理取胜。这似乎与大熊猫或极地无关。
python-3.x 熊猫 pytorch python-polars

评论

1赞 roganjosh 11/15/2023
当它需要一次使用整个列数据才能工作时,你怎么能有一个懒惰的方法?
1赞 roganjosh 11/15/2023
无论如何,我给了你关于如何加快速度的建议(无论你使用什么 df 库)。并行化 ;每根柱子都是独立处理的,因此在进程之间分配工作multiprocessing
1赞 roganjosh 11/15/2023
python 中没有任何内容本质上是并行的。时期。全局解释器锁 (GIL) 意味着在单个进程中,任何时候只执行一位字节码。 可以通过将进程传递给编译的 Rust 代码来解决这个问题,这些代码旨在以多线程方式工作,然后再将结果交还给 python。这与惰性评估不同,但这不是你在这里所做的。您正在获取一个完全成型的列,并将其传递给需要一次性完成整个列的单个进程。以串行方式。polars
1赞 roganjosh 11/15/2023
我正处于第 22 条军规中,现在就尝试为您解决这个问题。天色已晚,但我看得出你的误解,只是现在无法测试。我很快就会尝试演示。与此同时,我希望能提供一些线索来查找,我强烈建议您掌握“懒惰评估”,因为它对您的理解是一条红鲱鱼:)
2赞 Dean MacGregor 11/16/2023
我刚刚知道这个库,它似乎可以解决问题,所以你不必重新发明编译的季节性轮。

答:

2赞 jqurious 11/15/2023 #1

对于 Polars,在这种情况下使用 和 是一种“反模式”。.select().map_batches()

您将所有数据都通过 Polars 表达式引擎,将其传回 Python 以运行您的外部函数,然后再次将其传回 Polars。

您可以绕过它,只需直接传递每个列(类似于在 Pandas 方法中遍历每个列的方式):Seriesseasonal_decompose()

pl.DataFrame({
    col.name: seasonal_decompose(col, model="Additive", period=365).trend
    for col in df_p
})

不过,我确实注意到的一件事是,如果您从每列创建一个 LazyFrame 并使用 pl.collect_all(),它会将方法速度提高 ~50%。(也许可以对此进行调查。.map_batches()

(虽然仍然比理解稍慢。

lf = df_p.lazy()

lazy_columns = [ 
    lf.select(pl.col(col).map_batches(
        lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
    ) 
    for col in lf.columns 
]

out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")

从本质上讲,问题变成了“我怎样才能并行化一个 Python for 循环?

正如@roganjosh所指出的,这是通过多处理完成的。

from multiprocessing import get_context

...

if __name__ == "__main__":

    df_p = ...

    with get_context("spawn").Pool() as pool:
        columns = pool.map(process_column, (col for col in df_p))

出于兴趣,与常规理解相比,该示例在多处理的情况下运行速度提高了 ~50%。

但它非常特定于任务/数据/平台,因此您可以在本地对其进行基准测试。

评论

0赞 Dean MacGregor 11/15/2023
这真的很有趣,从字典理解中构建一个新的 df 和 pandas 方法一样快,但我不明白为什么选择方法差几个数量级。我认为选择正在处理的任何开销,df 句柄的创建也是如此。
1赞 Aaron C 11/16/2023
为了与上述其他时间进行比较,您的惰性方法出现在“程序在 43.89767186300014 中执行”。比我的 polars 版本有了很好的改进。
1赞 jqurious 11/16/2023
谢谢@AaronC - 这与我的结果相匹配,基本上它将运行时间减半。也许它可以作为“性能问题”提交,因为 Polars 可以自动为您执行此操作,如果它始终是一种更快的方法。但我不确定它是否只是特定于这个特定任务(或者 Polars 最终在内部做了什么不同的事情)。我没有进一步调试它的知识。
1赞 jqurious 11/16/2023
只是为了跟进我之前的评论,在其他基准测试中测试该方法,它最终会变慢。因此,它似乎可以更快完全取决于特定于任务。pl.collect_all()
1赞 jqurious 11/19/2023
@AaronC我很欣赏更新。我发布此内容的主要原因只是为了解决不使用 + 的问题。这个用例需要所有东西“在内存中”,在这种情况下,你可以直接处理对象。它太大了,无法发表评论,所以我不得不发布一个“答案”(这可能是一个“半答案”,所以我没想到它会被接受。.select().map_batches()pl.Series