提问人:Andreas Hadjigeorgiou 提问时间:9/17/2023 最后编辑:user17732522Andreas Hadjigeorgiou 更新时间:9/18/2023 访问量:81
如何使用例如“std::transform”对并行 C 样式的矩阵向量乘法实现进行现代化改造?
How to modernize a parallel C-style matrix-vector multiplication implementation using e.g. `std::transform`?
问:
我正在尝试用更现代的方法来调整我的编程实践,这些方法利用了 STL 容器、算法、执行策略等。
我开发了一个小型测试来试验对基本矩阵向量乘法的 C 风格实现进行现代化改造:,它与 OpenMP 并行化。在这里,遵循高级主程序,我在其中设置问题的参数,用一些随机值初始化向量,然后以两种不同的方法执行矩阵向量乘法。C[N] = A[N,M] * B[N]
int main(){
size_t N(9), M(5);
std::vector<double> A(N*M), B(M), C(N);
fill_random(A, 1.0, 1.0); // if min==max the vector is filled with that particular value, otherwise, it's randomly sampled with values in the range [min,max]
fill_random(B, 1.0, 1.0);
// Run for-loop based routine (parallel OpenMP)
std::fill(C.begin(), C.end(), 0.0);
bool err_for = run_for_loop_based_solution(A, B, C, N, M);
if(!err_for)
std::cout << "For-loop solution executed successfully!" << std::endl;
std::for_each(C.begin(), C.end(), [](double x){
std::cout << x << " ";
});
std::cout << std::endl;
// Run transform-based solution
std::fill(C.begin(), C.end(), 0.0);
bool err_task = run_transform_based_based_solution(A, B, C, N, M);
if(!err_task)
std::cout << "Transform-based solution executed successfully!" << std::endl;
std::for_each(C.begin(), C.end(), [](double x){
std::cout << x << " ";
});
std::cout << std::endl;
return 0;
}
现在,让我们来看看这两个实现!
参考实现,即使用基于 for 循环的 C 样式实现开发。计算与 OpenMP 线程并行化,使得每个线程计算输出向量的一个或多个元素的值。简单!run_for_loop_based_solution(A, B, C, N, M);
C[N]
template<typename T>
bool run_for_loop_based_solution(std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
if (A.size() != N*M) return 1;
if (B.size() != M) return 1;
if (C.size() != N) return 1;
#pragma omp parallel for
for(size_t i=0; i<N; ++i)
for(size_t j=0; j<M; ++j)
C[i] += A[i*M + j] * B[j];
return 0;
}
现在,我想使用 开发一个实现,即 ,它提供了选择执行策略以启用并行性的选项,例如 .像这样的东西:std::transform
run_transform_based_based_solution(A, B, C, N, M);
std::execution::par
template<typename T>
bool run_transform_based_based_solution(std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
if (A.size() != N*M) return 1;
if (B.size() != M) return 1;
if (C.size() != N) return 1;
// std::transform(std::execution::par, C.begin(), C.end(), C.begin(), [&](double x){
// return ...
// });
return 0;
}
在这一点上我很挣扎,因为我无法理解如何在索引的主体或与每个元素的计算相关的范围内传播。是否可以按照这种方法(?)进行操作,或者除了方法之外,是否有其他选择?std::transform
std::transform
注意:要运行代码,只需将上述代码片段包含在单个文件中,然后使用 .此外,在开头包含以下代码片段:main.cpp
-std==c++17
#include <iostream>
#include <algorithm>
#include <vector>
#include <random>
#include <execution>
// fill a std::vector with random values in the range [min, max]. if min==max the vector is filled with that particular value.
template<typename T>
void fill_random(std::vector<T> &v, T min, T max){
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<T> dist(min, max);
std::transform(v.begin(), v.end(), v.begin(), [&](T x){
return dist(gen);
});
}
答:
要获取索引,您可以使用此答案的方法
std::transform(std::execution::par, C.begin(), C.end(), C.begin(),
[&A, &B, &C, M](T& i) -> T {
auto index = &i - &C[0]; // Calculate the row index
return std::inner_product(A.begin() + index * M, A.begin() + (index + 1) * M, B.begin(), i);
});
评论
-> T
每个子范围的计算是 with 的子范围的 std::inner_product
。如果你有一个块视图,那么它相当简单:C[i]
A
B
template<typename T, typename Execution>
bool run_transform_based_based_solution(Execution e, std::vector<T> &A, std::vector<T> &B, std::vector<T> &C, size_t N, size_t M)
{
if (A.size() != N*M) return 1;
if (B.size() != M) return 1;
if (C.size() != N) return 1;
const chunks = ranges::views::chunk(A, M);
std::transform(e, chunks.begin(), chunks.end(), C.begin(), [e, &B](auto chunk){
return std::inner_product(e, chunk.begin(), chunk.end(), B.begin(), B.end());
});
return 0;
}
评论
const auto
std::
chunks
error: ‘chunk’ is not a member of ‘std::ranges::views’
g++ (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
chunk
评论