如何使用例如“std::transform”对并行 C 样式的矩阵向量乘法实现进行现代化改造?

How to modernize a parallel C-style matrix-vector multiplication implementation using e.g. `std::transform`?

提问人:Andreas Hadjigeorgiou 提问时间:9/17/2023 最后编辑:user17732522Andreas Hadjigeorgiou 更新时间:9/18/2023 访问量:81

问:

我正在尝试用更现代的方法来调整我的编程实践,这些方法利用了 STL 容器、算法、执行策略等。

我开发了一个小型测试来试验对基本矩阵向量乘法的 C 风格实现进行现代化改造:,它与 OpenMP 并行化。在这里,遵循高级主程序,我在其中设置问题的参数,用一些随机值初始化向量,然后以两种不同的方法执行矩阵向量乘法。C[N] = A[N,M] * B[N]

int main(){

    size_t N(9), M(5);

    std::vector<double> A(N*M), B(M), C(N);

    fill_random(A, 1.0, 1.0); // if min==max the vector is filled with that particular value, otherwise, it's randomly sampled with values in the range [min,max]
    fill_random(B, 1.0, 1.0);

// Run for-loop based routine (parallel OpenMP)
    std::fill(C.begin(), C.end(), 0.0);
    bool err_for = run_for_loop_based_solution(A, B, C, N, M);
    if(!err_for)
        std::cout << "For-loop solution executed successfully!" << std::endl;

    std::for_each(C.begin(), C.end(), [](double x){
        std::cout << x << " ";
    });
    std::cout << std::endl;

// Run transform-based solution 
    std::fill(C.begin(), C.end(), 0.0);
    bool err_task = run_transform_based_based_solution(A, B, C, N, M);
    if(!err_task)
        std::cout << "Transform-based solution executed successfully!" << std::endl;

    std::for_each(C.begin(), C.end(), [](double x){
    std::cout << x << " ";
    });
    std::cout << std::endl;

    return 0;
}

现在,让我们来看看这两个实现!

参考实现,即使用基于 for 循环的 C 样式实现开发。计算与 OpenMP 线程并行化,使得每个线程计算输出向量的一个或多个元素的值。简单!run_for_loop_based_solution(A, B, C, N, M);C[N]

template<typename T>
bool run_for_loop_based_solution(std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
    if (A.size() != N*M) return 1;
    if (B.size() != M) return 1;
    if (C.size() != N) return 1;

    #pragma omp parallel for
    for(size_t i=0; i<N; ++i)
        for(size_t j=0; j<M; ++j)
            C[i] += A[i*M + j] * B[j];
    
    return 0;
}

现在,我想使用 开发一个实现,即 ,它提供了选择执行策略以启用并行性的选项,例如 .像这样的东西:std::transformrun_transform_based_based_solution(A, B, C, N, M);std::execution::par

template<typename T>
bool run_transform_based_based_solution(std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
    if (A.size() != N*M) return 1;
    if (B.size() != M) return 1;
    if (C.size() != N) return 1;

    // std::transform(std::execution::par, C.begin(), C.end(), C.begin(), [&](double x){
    //     return ...  
    // });

    return 0;
}

在这一点上我很挣扎,因为我无法理解如何在索引的主体或与每个元素的计算相关的范围内传播。是否可以按照这种方法(?)进行操作,或者除了方法之外,是否有其他选择?std::transformstd::transform

注意:要运行代码,只需将上述代码片段包含在单个文件中,然后使用 .此外,在开头包含以下代码片段:main.cpp-std==c++17

#include <iostream>
#include <algorithm>
#include <vector>
#include <random>
#include <execution>

// fill a std::vector with random values in the range [min, max]. if min==max the vector is filled with that particular value.
template<typename T>
void fill_random(std::vector<T> &v, T min, T max){
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_real_distribution<T> dist(min, max);
    std::transform(v.begin(), v.end(), v.begin(), [&](T x){
        return dist(gen);
    });
}
C++ 并行处理 C++17 标准

评论


答:

1赞 Nimrod 9/18/2023 #1

要获取索引,您可以使用此答案的方法

    std::transform(std::execution::par, C.begin(), C.end(), C.begin(),
        [&A, &B, &C, M](T& i) -> T {
            auto index = &i - &C[0]; // Calculate the row index
            return std::inner_product(A.begin() + index * M, A.begin() + (index + 1) * M, B.begin(), i);
    });

演示

评论

0赞 Andreas Hadjigeorgiou 9/19/2023
谢谢,似乎工作正常!我需要深入研究这个片段的细节,因为这个实现超出了我的范围......一个问题:在这种情况下有什么作用?我看到代码可以编译,无论有没有它!-> T
0赞 Nimrod 9/19/2023
它的 lambda 的尾随返回类型。没有它,这里完全没问题。
0赞 Andreas Hadjigeorgiou 9/19/2023
我明白了,这是内产品的退货类型
0赞 Caleth 9/18/2023 #2

每个子范围的计算是 with 的子范围的 std::inner_product。如果你有一个块视图,那么它相当简单:C[i]AB

template<typename T, typename Execution>
bool run_transform_based_based_solution(Execution e, std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
    if (A.size() != N*M) return 1;
    if (B.size() != M) return 1;
    if (C.size() != N) return 1;

    const chunks = ranges::views::chunk(A, M);

    std::transform(e, chunks.begin(), chunks.end(), C.begin(), [e, &B](auto chunk){ 
        return std::inner_product(e, chunk.begin(), chunk.end(), B.begin(), B.end());
    });

    return 0;
}

评论

0赞 Andreas Hadjigeorgiou 9/19/2023
谢谢,你的答案。我试过了,但它没有编译!首先,应该在块之前有。然后,需求先于范围。但是,即使进行了这些更改,它也不会编译,因为它无法识别 .这就是我得到的相关错误const autostd::chunkserror: ‘chunk’ is not a member of ‘std::ranges::views’
0赞 Andreas Hadjigeorgiou 9/19/2023
我的编译器和操作系统信息是:g++ (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
0赞 Caleth 9/19/2023
@AndreasHadjigeorgiou不,来自 Ranges v3 库,并非所有库都是标准化的chunk