在 C++ 中从三个密集 Eigen::MatrixXi 中获取共同的第一个零值 indice

Get common first zero-value indice from three dense Eigen::MatrixXi in C++

提问人:DONTEN 提问时间:11/16/2023 最后编辑:DONTEN 更新时间:11/18/2023 访问量:49

问:

我有一个 NxN 非负称为 ,两个 Nx1 非负称为 和 。我想找到第一个索引 (i, j),使得 和 都是零。Eigen::MatrixXicost_matrixEigen::VectorXirowVectorcolVectorcost_matrix(i, j)rowVector(i)colVector(j)

我知道有一些幼稚的解决方案,比如遍历所有元素,但它们花费了太多时间。我想要 Eigen C++ 中最有效的方法。

这是我当前的代码,它比使用 .Eigen::DenseBase::visit

Eigen::MatrixXi cost_matrix(4,4);
cost_matrix<<1, 2, 3, 0, 
             5, 0, 0, 8,
             9, 8, 7, 6,
             0, 2, 1, 5;
Eigen::VectorXi rowCover(4);
Eigen::VectorXi colCover(4);
rowCover << 0, 0, 1, 1;
colCover << 1, 1, 0, 1;
//A data demo. Cost_matrix is a NxN nonnegative int matrix.
//RowCover and colCover are N nonnegative int vector.

int i, j;
cost_matrix.colwise() += rowCover;
cost_matrix.rowwise() += colCover.transpose();
Eigen::Index zeroRow, zeroCol;
int min = find_zero_matrix.transpose().minCoeff(&zeroCol, &zeroRow);
i = zeroRow;
j = zeroCol;
//the result should be
//i = 1
//j = 2
C++ 数据操作 特征

评论

0赞 Passer By 11/16/2023
您的描述与代码不符。编写代码描述通常比编写代码更糟糕。请创建一个最小的可重现示例
0赞 chtz 11/17/2023
只要你激活优化(也许在成功测试代码后停用断言),手动 C++ 通常不会比 Eigen 代码慢,除非 Eigen 知道一种巧妙优化问题的方法。不过,它可能更具可读性,也可能不更具可读性。
0赞 chtz 11/17/2023
您能否更改输入格式,例如,而不是存储,因为您可以直接存储零条目的索引(即,在 a 或 )中?rowCover[0 0 1 1]{0, 1}std::vectorEigen::VectorXi
0赞 Homer512 11/17/2023
一些问题: 1.您似乎将“第一”定义为尽可能低的行索引,列索引是第二优先级。您是否知道扫描此内容将导致列优先矩阵的迭代顺序次优?您是否能够转置输入数据和/或将矩阵更改为行优先矩阵?2. 与快速解决方案相比,您在多大程度上优先考虑优雅的解决方案?3. 您期望 0 的矩阵大小和百分比是多少?
0赞 DONTEN 11/17/2023
这将适用于视频流过程,矩阵大小 N 约为 20,但每帧将运行多次。0 的百分比约为 1/N(可能每行有几个 0)。

答:

1赞 Homer512 11/17/2023 #1

正如@chtz评论中已经建议的那样,Eigen 在这里不会真正帮助您。我们仍然可以尝试找到更快的版本。

这是我想出的:

首先,我扩展/修复您的代码以获得有效的参考实现。

/**
 * Reference implementation
 * 
 * Finds lowest row and column (prioritizes low row number)
 * where cost_matrix is zero and both row_cover and col_cover are zero
 * 
 * Returns {-1, -1} if no result is found.
 * The input arrays can be overwritten by the implementation as desired
 */
std::pair<int, int> reference(
      Eigen::MatrixXi& cost_matrix,
      Eigen::VectorXi& row_cover,
      Eigen::VectorXi& col_cover) noexcept
{
    int i, j;
    cost_matrix.colwise() += row_cover;
    cost_matrix.rowwise() += col_cover.transpose();
    Eigen::Index zeroRow, zeroCol;
    int min = cost_matrix.transpose().minCoeff(&zeroCol, &zeroRow);
    i = zeroRow;
    j = zeroCol;
    if(min) // no result found
        return {-1, -1};
    return {i, j};
}

遍历整个输入矩阵似乎非常浪费,除非覆盖向量通常为零,而输入矩阵不是。如果封面包含许多非零条目,最好将它们转换为零索引的向量。

/**
 * Overwrite the front of cover with indices where it was zero
 * 
 * The return value gives the total number of zeros found.
 * The values behind that are left in undefined state
 */
static Eigen::Index to_zero_indices(Eigen::VectorXi& cover) noexcept
{
    Eigen::Index outpos = 0;
    for(Eigen::Index i = 0, n = cover.size(); i < n; ++i) {
        /* Loop body is written to be branchless */
        int value = cover[i];
        cover[outpos] = static_cast<int>(i);
        outpos += ! value;
    }
    return outpos;
}

现在我们只需要检查矩阵中两个封面中都为零的条目。

/**
 * Alternative implementation based on sparse probing of the cost matrix
 * 
 * API-compatible to reference but in practice changes the cover vectors,
 * not the cost matrix
 */
std::pair<int, int> probe_indices(
      Eigen::MatrixXi& cost_matrix,
      Eigen::VectorXi& row_cover,
      Eigen::VectorXi& col_cover) noexcept
{
    const Eigen::Index rows_n = to_zero_indices(row_cover);
    const Eigen::Index cols_n = to_zero_indices(col_cover);
    for(int i: row_cover.head(rows_n))
        for(int j: col_cover.head(cols_n))
            if(! cost_matrix(i, j))
                return {i, j};
    return {-1, -1};
}

为获得最佳结果,请编译 with 以避免在各种索引操作中进行范围检查。-DNDEBUG

测试

矩阵大小 N 约为 20,但每帧将运行多次。0 的百分比约为 1/N(可能每行有几个 0)

我假设这个 1/N 不适用于覆盖向量。否则,您找到甚至一个条目的机会都非常低。我武断地决定用 4/N 作为封面,用 1/N 作为成本矩阵进行测试。

完整的测试和基准测试结果如下。在我的系统上,使用所选参数集,我的版本速度大约快 10 倍,即使我将封面更改为全零,它的速度也快了 7.5 倍。即使在绝对最坏的情况下 - 覆盖全为零,矩阵全为1 - 它的速度仍然是两倍。

#include <Eigen/Dense>

#include <chrono>
#include <iostream>
#include <random>
// using std::default_random_engine, std::bernoulli_distribution
#include <utility>
// using std::pair


/**
 * Fill the output with zeros and ones depending on the percentage
 *
 * Results are random but reproducible across process executions.
 * Input percentage is expected to be between 0 and 1
 */
void fill_pct(Eigen::Ref<Eigen::MatrixXi> out, double pct_ones)
{
    static std::default_random_engine::result_type seed = 0xdeadbeef;
    std::default_random_engine rng {seed++};
    std::bernoulli_distribution distr {pct_ones};
    out = Eigen::MatrixXi::NullaryExpr(
          out.rows(), out.cols(),
          [&]() -> int { return distr(rng); });
}
int main()
{
    using clock_t = std::chrono::steady_clock;
    const int size = 20;
    const double pct_zero = 1. / size;
    const double pct_zero_cover = 4. / size;
    const int repetitions = 1000000;
    Eigen::MatrixXi cost_matrix, matrix_copy;
    Eigen::VectorXi row_cover, col_cover, row_copy, col_copy;
    clock_t::duration reference_time {}, bench_time {};
    for(int i = 0; i < repetitions; ++i) {
        cost_matrix.resize(size, size);
        fill_pct(cost_matrix, 1. - pct_zero);
        matrix_copy = cost_matrix;
        for(Eigen::VectorXi* cover: {&row_cover, &col_cover}) {
            cover->resize(size);
            fill_pct(*cover, 1. - pct_zero_cover);
        }
        row_copy = row_cover;
        col_copy = col_cover;
        clock_t::time_point t1 = clock_t::now();
        std::pair<int, int> bench_result = probe_indices(
              cost_matrix, row_cover, col_cover);
        clock_t::time_point t2 = clock_t::now();
        std::pair<int, int> ref_result = reference(
              matrix_copy, row_copy, col_copy);
        clock_t::time_point t3 = clock_t::now();
        bench_time += t2 - t1;
        reference_time += t3 - t2;
        if(bench_result != ref_result)
            std::cout << bench_result.first << " != " << ref_result.first
                      << ' ' << bench_result.second << " != " << ref_result.second
                      << '\n';
    }
    using std::chrono::milliseconds;
    std::cout << "Reference " << std::chrono::duration_cast<milliseconds>(
                    reference_time).count() * 1e-3
              << "\nBench " << std::chrono::duration_cast<milliseconds>(
                    bench_time).count() * 1e-3
              << '\n';
}

其他要点

  • 正如注释中所讨论的,最好转置输入并查找第一列具有优先权的第一个条目。然而,只有当矩阵足够大,数据的局部性成为一个更大的问题时,这才真正成为一个大问题
  • 我不认为显式矢量化有多大意义。我觉得这可以通过收集指令来完成,但这只有在矩阵通常为零且封面为零的情况下才会真正起作用
  • 在行和列覆盖率大多为零的情况下,我们可以反转逻辑并首先扫描矩阵。这也将受益于显式矢量化

补遗: 我使用显式 AVX512 矢量化进行了一些测试。首先,利用.这稍微快一点,但并不值得。其次,我用一些矢量化扫描优化了上面的最坏情况。在这种特定情况下,它的性能翻了一番,但支持这两种模式并决定是使用扫描还是常规操作,都有足够的开销使常规情况的速度减半。所以不是很有用。to_zero_indicesvpcompressd