提问人:bproxauf 提问时间:7/25/2022 最后编辑:bproxauf 更新时间:8/1/2022 访问量:335
在迭代部分更新的数组中重复查找 K 个最大值索引的最快方法
Fastest way to repeatedly find indices of K largest values in an iteratively partially updated array
问:
在包含元素的复值数组中,我反复(迭代)更新元素。每次迭代后,在绝对平方实值数组中,我需要找到最大值的索引(可以假设是小的,当然,在实践中可能)。索引不需要排序。a
nsel = ~750000
>~10^6
nchange < ~1000
b
K
K
K <= ~50
K <= ~10
K
更新的值及其索引在每次迭代中都会发生变化,并且它们依赖于对应于最大值及其索引的(先验)未知元素。尽管如此,让我们假设它们本质上是随机的,除了一个特定元素(通常是(一个)最大值)总是包含在更新的值中。重要提示:更新后,新的最大值可能位于未更新的元素中。a
b
下面是一个最小的例子。为简单起见,它仅演示了 10^6(循环)迭代中的一个。我们可以使用 (for ) 或 (任意 ,一般情况,参见 https://stackoverflow.com/a/23734295/5269892) 找到最大值的索引。但是,由于 () 的大小较大,遍历整个数组以查找最大值的索引非常慢。结合大量的迭代,这形成了我正在使用的更大代码(非线性反卷积算法 CLEAN)的瓶颈,该代码嵌入了此步骤。K
b.argmax()
K = 1
b.argpartition()
K
b
nsel
我已经问过如何最有效地找到最大值(情况)的问题,请参阅 Python 在部分更改的数组中查找最大值索引的最有效方法。公认的解决方案依赖于仅通过将数据拆分为块并(重新)计算仅更新某些元素的块的最大值来部分访问。从而实现了加速。K = 1
b
> 7x
根据作者 @Jérôme Richard(感谢您的帮助!)的说法,不幸的是,这个解决方案不容易推广到 .正如他所建议的,一个可能的替代方案可能是二叉搜索树。现在我的K > 1
问题:这样的二叉树在实践中是如何实现的,我们如何最有效地(如果可能的话,很容易)找到最大值的索引?您是否有其他解决方案来以最快的方式在部分更新的数组中重复查找 K
个最大值的索引?
注意:在每次迭代中,我稍后将再次需要(或它的副本)作为 numpy 数组。如果可能的话,解决方案应该主要是基于python的,从python调用C或使用Cython或没问题。我目前使用 .b
numba
python 3.7.6, numpy 1.21.2
import numpy as np
# some array shapes ('nnu_use' and 'nm'), number of total values ('nvals'), number of selected values ('nsel';
# here 'nsel' == 'nvals'; in general 'nsel' <= 'nvals') and number of values to be changed ('nchange' << 'nsel')
nnu_use, nm = 10418//2 + 1, 144
nvals = nnu_use * nm
nsel = nvals
nchange = 1000
# number of largest peaks to be found
K = 10
# fix random seed, generate random 2D 'Fourier transform' ('a', complex-valued), compute power ('b', real-valued),
# and two 2D arrays for indices of axes 0 and 1
np.random.seed(100)
a = np.random.rand(nsel) + 1j * np.random.rand(nsel)
b = a.real ** 2 + a.imag ** 2
inu_2d = np.tile(np.arange(nnu_use)[:,None], (1,nm))
im_2d = np.tile(np.arange(nm)[None,:], (nnu_use,1))
# select 'nsel' random indices and get 1D arrays of the selected 2D indices
isel = np.random.choice(nvals, nsel, replace=False)
inu_sel, im_sel = inu_2d.flatten()[isel], im_2d.flatten()[isel]
def do_update_iter(a, b):
# find index of maximum, choose 'nchange' indices of which 'nchange - 1' are random and the remaining one is the
# index of the maximum, generate random complex numbers, update 'a' and compute updated 'b'
imax = b.argmax()
ichange = np.concatenate(([imax],np.random.choice(nsel, nchange-1, replace=False)))
a_change = np.random.rand(nchange) + 1j*np.random.rand(nchange)
a[ichange] = a_change
b[ichange] = a_change.real ** 2 + a_change.imag ** 2
return a, b, ichange
# do an update iteration on 'a' and 'b'
a, b, ichange = do_update_iter(a, b)
# find indices of largest K values
ilarge = b.argpartition(-K)[-K:]
答:
我尝试实现基于 C++ 容器的 Cython 解决方案(用于 64 位浮点值)。好消息是它比天真的要快。坏消息是它非常复杂,而且速度并不快:快 3~4 倍。np.argpartition
一个主要问题是 Cython 没有实现最有用的容器。可以使用类型实现此容器,但它会使代码更加复杂,并且效率也较低(由于内存中额外的缓存不友好的间接)。如果可以保证 中没有重复项,那么性能会明显更好(最多 x2),因为可以改用。此外,Cython 似乎不接受最近的 C++11/C++17/C++20 功能,使代码读/写起来更加繁琐。这很可悲,因为 [一些功能,如和 rvalues-references] 可以使代码更快。std::multimap
std::map<Key, std::vector<Value>>
b
std::map
extract
另一个主要问题是执行时间受缓存未命中(在我的机器上为 >75%)的限制,因为二进制 RB 树对缓存不友好。问题是整体数据结构很可能比 CPU 缓存大。事实上,至少需要存储键值,更不用说存储树数据结构的节点指针需要类似数量的内存,并且大多数处理器缓存都小于 30 MB。这主要是由于随机访问而导致的更新期间的一个问题:每次查找/插入都需要在 RAM 中获取,并且 RAM 的延迟通常为几十纳秒。此外,(C++) RB 树不支持密钥更新,因此需要删除+插入。我试图使用并行预取方法来缓解这个问题。不幸的是,它在实践中通常较慢......750_000*(8*2+4) = 15_000_000 bytes
log2(nsel)
在实践中,K 最大项的提取速度非常快(树中的 1000 个项和 750_000 个值大约需要几微秒),而更新大约需要 1.0-1.5 毫秒。同时,需要 ~4.5 毫秒。np.argpartition
一些人报告了(例如。这里)当项目数量相当大时,这实际上是相当慢的。因此,使用另一个非标准 C++ 实现可能是个好主意。在这种情况下,我希望 B 树会更快。Google Abseil 库包含这样的容器,它们的速度肯定要快得多。话虽如此,它肯定需要包装一些代码,这可能很乏味。或者,可以编写一个完整的 C++ 类并从 Cython 调用它。std::map
实现
下面是实现(以及最后的用法示例):
- maxtree.pyx:
# distutils: language = c++
import numpy as np
cimport numpy as np
cimport cython
# See: https://cython.readthedocs.io/en/latest/src/userguide/wrapping_CPlusPlus.html
from libcpp.vector cimport vector
from libcpp.map cimport map
from libcpp.pair cimport pair
from cython.operator cimport dereference as deref, preincrement as inc
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing
cdef class MaxTree:
cdef map[double, vector[int]] data
cdef int itemCount
# Build a tree from `b`
def __init__(self, double[::1] b):
cdef map[double, vector[int]].iterator it
cdef pair[double, vector[int]] node
cdef double val
cdef int i
# Temporary node used to ease insertion
node.second.resize(1)
# Iterate over `b` items so to add them in the tree
for i in range(b.size):
val = b[i]
it = self.data.find(val)
if it == self.data.end():
# Value not found: add a new node
node.first = val
node.second[0] = i
self.data.insert(node)
else:
# Value found: adds a new duplicate in an existing node
deref(it).second.push_back(i)
self.itemCount = b.size
def size(self):
return self.itemCount
# Get the index (in the original `b` array) of the K-largest values
def getKlargest(self, int count):
cdef map[double, vector[int]].reverse_iterator rit
cdef int vecSize
cdef int* vecData
cdef int i, j
cdef int[::1] resultView
if count > self.itemCount:
count = self.itemCount
result = np.empty(count, dtype=np.int32)
resultView = result
i = 0
rit = self.data.rbegin()
while rit != self.data.rend():
vecSize = deref(rit).second.size()
vecData = deref(rit).second.data()
# Note: indices are not always sorted here due to the update
for j in range(vecSize-1, -1, -1):
resultView[i] = vecData[j]
i += 1
count -= 1
if count <= 0:
return resultView
inc(rit)
return result
# Set the values of `b` at the index `index` to `values` and update the tree accordingly
def update(self, double[::1] b, int[::1] index, double[::1] values):
cdef map[double, vector[int]].iterator it
cdef pair[double, vector[int]] node
#cdef pair[map[double, vector[int]].iterator, bool] infos
cdef int idx, i, j, vecSize, indexSize
cdef double oldValue, newValue
cdef int* vecData
assert b.size == self.itemCount
assert index.size == values.size
assert np.min(index) >= 0 and np.max(index) < b.size
# Temporary node used to ease insertion
node.second.resize(1)
for i in range(index.size):
idx = index[i]
oldValue = b[idx]
newValue = values[i]
it = self.data.find(oldValue)
assert it != self.data.end()
# Update the tree
if deref(it).second.size() == 1:
# Remove the node from the tree and add a new one because keys are immutable
# Assume `index` is correct/coherent and the tree is correctly updated for sake of performance
#assert deref(it).second[0] == idx
self.data.erase(it)
node.first = newValue
node.second[0] = idx
infos = self.data.insert(node)
inserted = infos.second
if not inserted:
# Duplicate
it = infos.first
deref(it).second.push_back(idx)
else:
# Tricky case due to duplicates (untested)
vecData = deref(it).second.data()
vecSize = deref(it).second.size()
# Search the element and remove it
for j in range(vecSize):
if vecData[j] == idx:
vecData[j] = vecData[vecSize-1]
deref(it).second.pop_back()
break
# Update `b`
b[idx] = values[i]
- setup.py:
# setup.py
from setuptools import setup
from Cython.Build import cythonize
setup(ext_modules=cythonize("maxtree.pyx"))
- main.py:
# Usage:
import numpy as np
import maxtree
np.random.seed(0)
b = np.random.rand(750_000)
nchange = 1_000
ichange = np.random.randint(0, b.size, nchange).astype(np.int32)
tree = maxtree.MaxTree(b)
tree.getKlargest(nchange)
tree.update(b, ichange, b[ichange]*0.999)
- 要运行的命令:
python3 setup.py build_ext --inplace -q
评论
K < ~20 << nchange
3-4
argpartition
b
K
评论
>~10^6
SortedDict
sortedcontainers
b
b
std::multimap
std::multimap
std::map<Key, std::vector<Value>>