从矩阵中查找第一个匹配子矩阵的快速方法

A quick way to find the first matching submatrix from the matrix

提问人:ojipadeson 提问时间:8/15/2023 最后编辑:ojipadeson 更新时间:8/16/2023 访问量:125

问:

我的矩阵很简单,比如:

# python3 numpy
>>> A
array([[0., 0., 1., 1., 1.],
       [0., 0., 1., 1., 1.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])
>>> P
array([[0., 0., 0., 0.]])

我需要在 A 中找到一个与 P (1x4) 大小相同的全零区域(一个就足够了)。 所以正确的答案包括:

(2, 0)  # The vertex coordinates of the all-zero rectangular region that P can be matched
(2, 1)
(3, 0)
(3, 1)
(4, 0)
(4, 1)
# Just get any 1 answer

实际上,我的 A 矩阵将达到 30,000*30,000 的大小。我担心如果写成循环语句会很慢。有什么捷径吗?

P的大小不确定,从10*30到4000*80。同时,A 矩阵缺乏规律性,从任何点循环可能需要遍历整个矩阵才能成功匹配

Python Numpy 矩阵 比较

评论

0赞 Julien 8/15/2023
我的假设是,这取决于您期望匹配存在的可能性/密度。我相信(不完全确定)numpy 中的任何矢量化代码都会计算整个矩阵。因此,循环会很快,但一旦找到第一个解决方案,它就不会立即退出。如果你用 python 编写循环,它可能会更慢,但你可以很容易地尽早存在。
0赞 Karl Knechtel 8/15/2023
总是有这个大小和形状,还是你也需要概括一下?P
0赞 ojipadeson 8/15/2023
@KarlKnechtel并非总是如此
0赞 ojipadeson 8/15/2023
@Julien 如果有一个有效的 numpy 方法,我可以将其与循环进行比较。因为其实,左上角匹配和右下角匹配的情况很多。无论你从哪里开始循环,你都可能遇到最坏的情况
0赞 ken 8/15/2023
矩阵可以达到多大?在您的示例中,矩阵只有 0 或 1,您的实际矩阵是否相同?P

答:

0赞 Jérôme Richard 8/15/2023 #1

首先,完全分析可能很昂贵,特别是因为内存很大(所以它可以装在RAM中)并且RAM很慢。我们可以逐行扫描 A,以便找出可以适合 where 和 的最后一行。事情可能很大,所以我们需要一种快速的方法,所以检查一下。AAPnpAnp,mp = P.shapena,ma = A.shapenp

为了使问题更易于理解,让我们假设我们可以(有效地)预先计算。现在的问题是如何找到一个大小至少为 的值的 2D 区域。为此,我们可以计算最后几行的逻辑 AND,并检查结果中是否存在至少值的序列。例如,如果 和 ,那么我们可以计算每个可能的 ,然后检查每个是否有 4 个连续的 。此操作只不过是带有逻辑 AND 运算符的 2D 卷积,上述优化(大幅降低算法复杂性)称为滤波器分离B = A == 0True(np,mp)npBmpnp=3mp=4mask = B[i] & B[i+1] & B[i+2]imaskTrue

事实上,请注意,布尔数的逻辑 AND 等价于 0-1 值的乘积。这意味着我们可以使用二维快速傅里叶变频 (FFT) 来加速卷积。虽然 2D FFT 可用于以最佳算法(及时)计算这个问题,但这需要大量的内存,我们无法将它们与扫描线策略相结合。我们可以找到另一种策略,使用更少的内存,但代价可能是更多的计算(和不太理想的复杂性)。O(na ma ((log na) + (log ma)))

可以看出,连续的行大多计算相同的东西,尤其是当大的时候。事实上,对于我们计算,对于 ,我们计算 。 被重新计算两次。对于 k 个连续的行,将共享/重新计算项。因此,如果很大,最好对运算进行因式分解,以免重新计算许多项。masknpi=0mask = B[0] & B[1] & B[2]i=1mask = B[1] & B[2] & B[3]B[1] & B[2]np-knp

要做到这一点,一个有效的方法是建立一个包含部分约简的二进制树。例如,我们可以计算 和 然后 ,所以如果我们想计算,我们可以重用二叉树节点。我们可以证明节点值足以计算(即最后一行的交点)。这意味着该解决方案是有效的(类似于 FFT 解决方案)。但是,实施起来并不简单。node0 = B[0] & B[1]node1 = B[2] & B[3]node2 = node0 & node1B[0] & B[1] & B[2]O(log np)masknp

一个更简单、效率较低的解决方案是按大小计算线的交点。 需要仔细选择:足够大以避免多次重新计算相同的东西,足够小以使块真正有用。 对于相对较大的值来说,这当然是一个很好的解决方案(当非常小的时候,不使用块肯定更好,比如)。ccc = int(ceil(sqrt(np)/2))npnp<=4

请注意,您可以将布尔值打包成位,以便使线交点明显更快,因为生成的数组要小 8 倍。这可以使用 来完成。使用此策略,布尔值的逻辑 AND 被按位 AND 替换(如果不是实际上更快,它同样快)。虽然长度为 30_000 的行需要 234 KiB 的 RAM,但关联的布尔值只能存储在 4 KiB 中。后者可以更好地适应 CPU 缓存,从而加快计算速度。np.packbitsA

请注意,当找到匹配的扫描线时,可以提前停止计算。另请注意,如果 ,那么首先转置可能会快得多。mp << npA

为了提高性能,可以使用 Cython 或 Numba 来有效地计算(CPython 循环很慢,但不是 Cython/Numba 的循环)。

0赞 Alain T. 8/16/2023 #2

要查找具有特定高度/宽度的所有零个矩形的坐标,您可以将 1 和 0 水平转换为连续 0 的数量,并将其与目标宽度进行比较。然后在水平比较结果的 Trues 上垂直执行相同的操作。累积到最小垂直大小的所有坐标将表示零个矩形的右下角:

import numpy as np

A = np. array([
       [0., 0., 1., 1., 1.],
       [0., 0., 1., 1., 1.],
       [0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 1.],
       [0., 0., 0., 0., 0.]])

height,width = 3,2


H0 = (A == 0)*(np.arange(A.shape[1])+1)[None,:]
H1 = (A != 0)*(np.arange(A.shape[1])+1)[None,:]
Hz = H0 - np.maximum.accumulate(H1,axis=1)

V0 = (Hz >= width)*(np.arange(A.shape[0])+1)[:,None]
V1 = (Hz <  width)*(np.arange(A.shape[0])+1)[:,None]
V2 = V0 - np.maximum.accumulate(V1,axis=0)

bottomRight = np.where(V2 >= height)
topLeft     = bottomRight - np.array([[height-1],[width-1]])

print(*zip(*topLeft))
print(*zip(*bottomRight))

(0, 0) (2, 2)
(2, 1) (4, 3)

通过检查中间结果,这可能更容易掌握:

水平连续 0 的 Hz 数(1 为负值)

array([[ 1,  2, -3, -4, -5],
       [ 1,  2, -3, -4, -5],
       [ 1,  2,  3,  4,  5],
       [ 1, -2,  1,  2, -5],
       [ 1,  2,  3,  4,  5]])

值 2(或更多)对应于与宽度要求匹配的水平连续零的结束位置

V2 水平大小的垂直连续匹配数 (Hz >= 宽度)

array([[-1,  1, -1, -1, -1],
       [-2,  2, -2, -2, -2],
       [-3,  3,  1,  1,  1],
       [-4, -4, -4,  2, -4],
       [-5,  1,  1,  3,  1]])

包含 3 个(或更多)的位置对应于大小为 3x2 的零个矩形的右下角

性能

这种“全矩阵”方法在非常大的数据上很慢(30,000 x 30,000 需要 220 秒)。由于您只查找一个匹配项,因此混合矢量化/循环方法可能会提供更好的结果:

V2       = np.zeros(A.shape[1])
rowRange = np.arange(A.shape[1])+1
for r,row in enumerate(A==0):
    H0 = row*rowRange
    H1 = (~row)*rowRange
    Hz = (H0 - np.maximum.accumulate(H1)) >= width
    V2 = V2*Hz + Hz
    if np.any(V2>=height):
        print(time()-start,"found",(r,np.where(V2>=height)[0][0]))
        break

这种混合方法使用相同的技术,但逐行而不是在整个矩阵上进行。如果矩阵不是正方形,您可以通过选择基于较小维度的按列或按行处理来进一步优化。由于 Pyhton 循环比矢量化 numpy 计算慢,因此这应该会最大限度地减少 Python 代码开销。

在 30,000 x 30,000 矩阵上,当找不到任何匹配项时,只花了 9 秒(这是最坏的情况)。它在较小的尺寸(例如 500 x 500)上并不快,但它确实减少了 numpy 操纵的内存量并最终赶上。

在这两种情况下,速度都与 P 的大小无关。

2赞 ken 8/16/2023 #3

正如@Julien在评论中指出的那样,一般来说,我们可以使用滑动窗口来完成此类任务。

def find_all_zero_region_by_sliding_window(a, shape):
    x, y = np.nonzero(np.lib.stride_tricks.sliding_window_view(a, shape).max(axis=-1).max(axis=-1) == 0)
    return np.stack((x, y), axis=-1)


find_all_zero_region_by_sliding_window(A, P.shape)

但是,不幸的是,这需要大量内存。

numpy.core._exceptions.MemoryError: Unable to allocate 11.3 TiB for an array with shape (26001, 29921, 4000) and data type float32
                                                       ^^^^^^^^

As an alternative, I think using the Summed-area table is a good idea.

It is similar to the sliding window approach above, but instead of finding the maximum value, we can calculate the sum (very efficiently) and search for the position where it is zero. Note that this assumes that does not contain any negative values. Otherwise, you would have to use .Anumpy.abs

Since we do not need to be able to calculate the sum of any given position, I adapted this idea and implemented it to require only a single-line cache.

import numpy as np
from typing import Tuple


def find_all_zero_region(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
    input_height, input_width = arr.shape
    kernel_height, kernel_width = kernel_size

    matches = []

    # Calculate summed_line for y==0.
    summed_line = arr[:kernel_height].sum(axis=0)

    for y in range(input_height - kernel_height + 1):
        # Update summed_line for row y.
        if y != 0:  # Except y==0, which already calculated above.
            # Adding new row and subtracting old row.
            summed_line += arr[y + kernel_height - 1] - arr[y - 1]

        # Calculate kernel_sum for (y, 0).
        kernel_sum = summed_line[:kernel_width].sum()
        if kernel_sum == 0:
            matches.append((y, 0))

        # Calculate kernel_sum for (y, 1) to (y, right-edge).
        # Using the idea of a summed-area table, but in 1D (horizontally).
        (all_zero_region_cols,) = np.nonzero(kernel_sum + np.cumsum(summed_line[kernel_width:] - summed_line[:-kernel_width]) == 0)
        for col in all_zero_region_cols:
            matches.append((y, col + 1))

    if not matches:
        # For Numba, output must be a 2d array.
        return np.zeros((0, 2), dtype=np.int64)
    return np.array(matches, dtype=np.int64)

As you can see, this uses loops, but it should be much faster than you think because the required memory is relatively small and the number of calculations/comparisons is greatly reduced. Here is some timing code.

import time


rng = np.random.default_rng(0)
A = rng.integers(0, 2, size=(30000, 30000)).astype(np.float32)

P = np.zeros(shape=(4000, 80))

# Create an all-zero region in the bottom right corner which will be searched last.
A[-P.shape[0] :, -P.shape[1] :] = 0

started = time.perf_counter()
result = find_all_zero_region(A, P.shape)
print(f"{time.perf_counter() - started} sec")
print(result)
# 3.541154200000001 sec
# [[26000 29920]]

Moreover, this function can be even faster by using Numba. Just add annotations as follows:

import numba


@numba.njit("int64[:,:](float32[:,:],UniTuple(int64,2))")
def find_all_zero_region_with_numba(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
    ...
started = time.perf_counter()
find_all_zero_region_with_numba(A, P.shape)
print(f"{time.perf_counter() - started} sec")
# 1.6005743999999993 sec

Note that I implemented it to find all positions of the all-zero regions, but you can also make it return on the first one. Since it uses loops, the average execution time will be even faster.