高效跟踪数据修订

Keep track of data revisions efficiently

提问人:jacoo 提问时间:9/13/2023 最后编辑:OCajacoo 更新时间:9/20/2023 访问量:72

问:

我得到一个像 .datetime,value

API 为我提供了整个历史记录,大约 10,000 个观测值,并且大约每小时都会添加新的观测值。

我需要将新数据添加到数据帧中,并对其运行一些分析并保存 df。通常,当我运行代码时,我只会得到新的观察结果。

有时一些旧的观察结果会被修改,即 API 可能会更改我已经下载和处理的一些观察结果。

提要不会标记此类修订,我需要跟踪它。假设我得到 2 个数据修订(通过对 API 的不同调用),每个修订都更改了2023-09-12 08:00

然后,我需要在我的 df 中包含以下行

date, value, revision
2023-09-12 08:00, 1000, 0
2023-09-12 08:00, 2000, 1 # <- this is a revised observation. 
2023-09-12 08:00, 1000, 2 # <- this is a second revised observation. 
2023-09-13 06:00, 1500, 0`

通过循环 df 并查看是否有更改,很容易识别和标记修订,但这很慢。

只是想知道是否有一种快速而聪明的方法可以做到这一点

Python Pandas 数据帧 时间序列 python-datetime

评论


答:

1赞 OCa 9/15/2023 #1

让我们确保我们有一个理解:

假设有 2 个数据帧,

  • 一个是跟踪更改的存储库
  • 另一个是新数据的馈送,通常包含
    • 旧时间戳(历史),
    • 修改的时间戳(修订版)
    • 新时间戳

然后,将对进行过滤,以仅保留非冗余时间戳,按修订编号,并附加到存储库中。


输入数据

存储 库:

repo = pd.DataFrame(columns = ['date', 'value', 'revision'],
                    data = [['2023-09-01 08:00', 500, 0],
                            ['2023-09-02 08:00', 1000, 0],
                            ['2023-09-02 08:00', 2000, 1]]
                   ).astype({'date':'datetime64[ns]'})

                 date  value  revision
0 2023-09-01 08:00:00    500         0
1 2023-09-02 08:00:00   1000         0
2 2023-09-02 08:00:00   2000         1

饲料:

feed = pd.DataFrame(columns = ['date', 'value'],
                    data = [['2023-09-01 08:00', 500],   # unchanged
                            ['2023-09-02 08:00', 1000],  # revision: back to 1000
                            ['2023-09-03 08:00', 1500]]  # new timestamp
                   ).astype({'date':'datetime64[ns]'}) 

                 date  value
0 2023-09-01 08:00:00    500
1 2023-09-02 08:00:00   1000
2 2023-09-03 08:00:00   1500

加工

第 1 步:连接存储库和源,但在临时列中记录每行的源代码

校订:

revi = pd.concat([repo.assign(source='repo'), 
                  feed.assign(source='feed')]
                ).sort_values(by='date').reset_index(drop=True)

                 date  value  revision source
0 2023-09-01 08:00:00    500       0.0   repo
1 2023-09-01 08:00:00    500       NaN   feed
2 2023-09-02 08:00:00   1000       0.0   repo
3 2023-09-02 08:00:00   2000       1.0   repo
4 2023-09-02 08:00:00   1000       NaN   feed
5 2023-09-03 08:00:00   1500       NaN   feed

第2步。分配修订号

有点技术性,简单地说,每组具有相同时间戳的行的大小 () 用于计算修订号。len

aggf = len

revi.loc[revi.revision.isna(), 'revision'] = revi.set_index('date')['revision'].groupby(level=0).agg(aggf).values - 1

                 date  value  revision source
0 2023-09-01 08:00:00    500       0.0   repo
1 2023-09-01 08:00:00    500       1.0   feed
2 2023-09-02 08:00:00   1000       0.0   repo
3 2023-09-02 08:00:00   2000       1.0   repo
4 2023-09-02 08:00:00   1000       2.0   feed
5 2023-09-03 08:00:00   1500       0.0   feed

第 3 步:移除非新信息的 Feed 行

revi = revi.drop_duplicates(subset=['date','source'], keep='last'   # keep only the latest revision for each repo timestamp
          ).drop_duplicates(subset=['date','value'],  keep='first') # remove feed rows that only repeated the repo.

                 date  value  revision source
0 2023-09-01 08:00:00    500       0.0   repo
3 2023-09-02 08:00:00   2000       1.0   repo
4 2023-09-02 08:00:00   1000       2.0   feed
5 2023-09-03 08:00:00   1500       0.0   feed

第 4 步:通过删除存储库行,然后删除“源”列来最终清理修订数据帧

revi = revi[revi.source=='feed'].drop(columns='source')

                 date  value  revision
4 2023-09-02 08:00:00   1000       2.0
5 2023-09-03 08:00:00   1500       0.0

这些是当前源中需要提交到存储库的 2 行:

repo = pd.concat([repo, revi]
        ).sort_values(by=['date','revision']
        ).reset_index(drop=True)

评论

0赞 jacoo 9/15/2023
哇,太棒了,干得好!
0赞 OCa 9/15/2023
@jacoo 不客气。这是一个很好的问题,寻找解决方案是令人愉快的。在分配修订号之前过滤掉冗余的源行可能会获得额外的速度。因为总体上修订可能很少见。提供的答案应该已经代表了显着的速度提升。