提问人:jacoo 提问时间:9/13/2023 最后编辑:OCajacoo 更新时间:9/20/2023 访问量:72
高效跟踪数据修订
Keep track of data revisions efficiently
问:
我得到一个像 .datetime,value
API 为我提供了整个历史记录,大约 10,000 个观测值,并且大约每小时都会添加新的观测值。
我需要将新数据添加到数据帧中,并对其运行一些分析并保存 df。通常,当我运行代码时,我只会得到新的观察结果。
有时一些旧的观察结果会被修改,即 API 可能会更改我已经下载和处理的一些观察结果。
提要不会标记此类修订,我需要跟踪它。假设我得到 2 个数据修订(通过对 API 的不同调用),每个修订都更改了2023-09-12 08:00
然后,我需要在我的 df 中包含以下行
date, value, revision
2023-09-12 08:00, 1000, 0
2023-09-12 08:00, 2000, 1 # <- this is a revised observation.
2023-09-12 08:00, 1000, 2 # <- this is a second revised observation.
2023-09-13 06:00, 1500, 0`
通过循环 df 并查看是否有更改,很容易识别和标记修订,但这很慢。
只是想知道是否有一种快速而聪明的方法可以做到这一点
答:
1赞
OCa
9/15/2023
#1
让我们确保我们有一个理解:
假设有 2 个数据帧,
- 一个是跟踪更改的存储库
- 另一个是新数据的馈送,通常包含
- 旧时间戳(历史),
- 修改的时间戳(修订版)
- 新时间戳
然后,将对源进行过滤,以仅保留非冗余时间戳,按修订编号,并附加到存储库中。
输入数据
存储 库:
repo = pd.DataFrame(columns = ['date', 'value', 'revision'],
data = [['2023-09-01 08:00', 500, 0],
['2023-09-02 08:00', 1000, 0],
['2023-09-02 08:00', 2000, 1]]
).astype({'date':'datetime64[ns]'})
date value revision
0 2023-09-01 08:00:00 500 0
1 2023-09-02 08:00:00 1000 0
2 2023-09-02 08:00:00 2000 1
饲料:
feed = pd.DataFrame(columns = ['date', 'value'],
data = [['2023-09-01 08:00', 500], # unchanged
['2023-09-02 08:00', 1000], # revision: back to 1000
['2023-09-03 08:00', 1500]] # new timestamp
).astype({'date':'datetime64[ns]'})
date value
0 2023-09-01 08:00:00 500
1 2023-09-02 08:00:00 1000
2 2023-09-03 08:00:00 1500
加工
第 1 步:连接存储库和源,但在临时列中记录每行的源代码
校订:
revi = pd.concat([repo.assign(source='repo'),
feed.assign(source='feed')]
).sort_values(by='date').reset_index(drop=True)
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
1 2023-09-01 08:00:00 500 NaN feed
2 2023-09-02 08:00:00 1000 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 NaN feed
5 2023-09-03 08:00:00 1500 NaN feed
第2步。分配修订号
有点技术性,简单地说,每组具有相同时间戳的行的大小 () 用于计算修订号。len
aggf = len
revi.loc[revi.revision.isna(), 'revision'] = revi.set_index('date')['revision'].groupby(level=0).agg(aggf).values - 1
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
1 2023-09-01 08:00:00 500 1.0 feed
2 2023-09-02 08:00:00 1000 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 2.0 feed
5 2023-09-03 08:00:00 1500 0.0 feed
第 3 步:移除非新信息的 Feed 行
revi = revi.drop_duplicates(subset=['date','source'], keep='last' # keep only the latest revision for each repo timestamp
).drop_duplicates(subset=['date','value'], keep='first') # remove feed rows that only repeated the repo.
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 2.0 feed
5 2023-09-03 08:00:00 1500 0.0 feed
第 4 步:通过删除存储库行,然后删除“源”列来最终清理修订数据帧
revi = revi[revi.source=='feed'].drop(columns='source')
date value revision
4 2023-09-02 08:00:00 1000 2.0
5 2023-09-03 08:00:00 1500 0.0
这些是当前源中需要提交到存储库的 2 行:
repo = pd.concat([repo, revi]
).sort_values(by=['date','revision']
).reset_index(drop=True)
评论
0赞
jacoo
9/15/2023
哇,太棒了,干得好!
0赞
OCa
9/15/2023
@jacoo 不客气。这是一个很好的问题,寻找解决方案是令人愉快的。在分配修订号之前过滤掉冗余的源行可能会获得额外的速度。因为总体上修订可能很少见。提供的答案应该已经代表了显着的速度提升。
评论