提问人:Jamie 提问时间:9/20/2023 更新时间:9/21/2023 访问量:75
如何在 C/C++ 中启用带有多线程 FFTW 的 OpenMP?
How to enable OpenMP with Multi-threaded FFTW in C/C++?
问:
当我在他们的文档中读到您应该创建一次所有 fftw 计划并多次执行时,我正在努力在 C/C++ 中缓慢实现我的 FFTW,我能够正确实现。现在,我正在尝试将内置的 openmp 功能与并行 FFTW 一起使用。我跟着他们关于 fftw thread-saftey 的文档:
The only thread-safe routine in FFTW is fftw_execute.
All other routines (e.g. the planner) should only be called from one thread at a time.
So, for example, you can wrap a semaphore lock around any calls to the planner.
这就是我使用但是,现在的问题是我不确定如何或在哪里调用以下内容:std::lock_guard<std::mutex> lock(...
Third, before creating a plan that you want to parallelize, you should call:
void fftw_plan_with_nthreads(int nthreads);
我现在拥有的任何示例代码似乎都不起作用或被“并行化”!
代码示例:
#include"omp.h"
#include <mutex>
#include <thread>
#include "fftw3.h"
static const int nx = 128;
static const int ny = 128;
static const int ncomp = 2;
static const int nyk = ny/2 + 1;
class MyFftwClass {
public:
MyFftwClass(void);
~MyFftwClass(void);
void execute(double rArr[], double cArr[][ncomp]);
private:
static fftw_plan s_plan; // <-- shared by all instances
double *m_buffer_in;
fftw_complex *m_buffer_out;
};
class MyFftwClass1 {
public:
MyFftwClass1(void);
~MyFftwClass1(void);
void execute1(double cArr[][ncomp],double rArr[]);
private:
static fftw_plan s_plan1; // <-- shared by all instances
fftw_complex *m_buffer_in1;
double *m_buffer_out1;
};
int main(){
fftw_init_threads(); //before calling any FFTW routines, you should call the function
//This function, which should only be called once (probably in your main() function), performs any one-time initialization required to use threads on your system.
int nThreads =1;//4;
omp_set_num_threads(nThreads);
double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double));
for (int i = 0; i < nx; i++){
for (int j = 0; j < ny; j++){
Function[j + ny*i] = //some initialization;
}
}
fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));
MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
r2c1.execute(Function,Functionk);
double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double));
memset(Out, 42, nx*ny* sizeof(double));
MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
c2r1.execute1(Functionk,Out);
//fftw_free stuff
}
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass1::MyFftwClass1(void)
{
// serialize the initialization!
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
// allocate separate buffers for each instance
m_buffer_in1 = fftw_alloc_complex(nx * nyk);
m_buffer_out1 = fftw_alloc_real(nx * ny);
if (!(m_buffer_in1 && m_buffer_out1))
{
throw new std::runtime_error("Failed to allocate memory!");
}
// create plan *once* for all instances/threads
if (!s_plan1)
{
s_plan1 = fftw_plan_dft_c2r_2d(nx, ny, m_buffer_in1, m_buffer_out1, FFTW_PATIENT);
if (!s_plan1)
{
throw new std::runtime_error("Failed to create plan!");
}
}
}
MyFftwClass1::~MyFftwClass1(void)
{
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
fftw_destroy_plan(s_plan1);
fftw_free(m_buffer_in1);
fftw_free(m_buffer_out1);
}
void MyFftwClass1::execute1(double cArr[][ncomp],double rArr[]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
// No serialization is needed here
memcpy(m_buffer_in1,cArr, sizeof(fftw_complex) * nx*(nyk));
fftw_execute_dft_c2r(s_plan1, m_buffer_in1, m_buffer_out1); //instead of fftw_excute(plan)
memcpy(rArr, m_buffer_out1,sizeof(double) * nx*ny);
//(rArr, 1.0 / (nx*ny), rArr); //renormalize
}
fftw_plan MyFftwClass1::s_plan1 = NULL;
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass::MyFftwClass(void)
//int r2cfft_initialize(r2cfft_t *const ctx, const std::size_t nx, const std::size_t ny)
{
// serialize the initialization!
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
//allocating buffers first before plan creating gets rid off seg fault error
// allocate separate buffers for each instance
m_buffer_in = fftw_alloc_real(nx * ny);
m_buffer_out = fftw_alloc_complex(nx * nyk);
if (!(m_buffer_in && m_buffer_out))
{
throw new std::runtime_error("Failed to allocate memory!");
}
// create plan *once* for all instances/threads
if (!s_plan)
{
s_plan = fftw_plan_dft_r2c_2d(nx, ny, m_buffer_in, m_buffer_out, FFTW_PATIENT);
if (!s_plan)
{
throw new std::runtime_error("Failed to create plan!");
}
}
}
MyFftwClass::~MyFftwClass(void)
{
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
fftw_free(m_buffer_in);
fftw_free(m_buffer_out);
fftw_destroy_plan(s_plan);
}
void MyFftwClass::execute(double rArr[], double cArr[][ncomp]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
// No serialization is needed here
memcpy(m_buffer_in, rArr, sizeof(double) * nx*ny);
fftw_execute_dft_r2c(s_plan, m_buffer_in, m_buffer_out); //instead of fftw_excute(plan)
memcpy(cArr, m_buffer_out, sizeof(fftw_complex) * nx*(nyk));
}
fftw_plan MyFftwClass::s_plan = NULL;
这看起来是线程安全的和/或并行的吗?谢谢!
答:
两件事:
您计划使用 .这是非常昂贵的,对于线程来说更是如此。你需要经常重用计划,做很多很多的转换;或者您需要节省计划的智慧,以便成本只发生一次
FFTW_PATIENT
您的 FFT 相当小。可能无法并行化它
最小修复
这是一个最小修改的版本。我认为还有很多问题已经过时了,或者可以做得更好。但是,让我们首先关注这一点:
int main(){
fftw_init_threads();
fftw_import_system_wisdom();
fftw_import_wisdom_from_filename("/var/tmp/my_application_fftw.wisdom");
int nThreads =1;//4;
omp_set_num_threads(nThreads);
fftw_plan_with_nthreads(nThreads);
double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double));
for (int i = 0; i < nx; i++){
for (int j = 0; j < ny; j++){
Function[j + ny*i] = 1.;//some initialization;
}
}
fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));
double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double));
memset(Out, 42, nx*ny* sizeof(double));
MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
for(int i = 0; i < 100000; ++i) {
r2c1.execute(Function,Functionk);
c2r1.execute1(Functionk,Out);
}
fftw_export_wisdom_to_filename("/var/tmp/my_application_fftw.wisdom");
//fftw_free stuff
}
编译方式(或者如果不可用)g++ -O3 -lfftw -lfftw_omp -fopenmp
-lfftw_threads
fftw_omp
请注意,我是如何重复使用相同的两个计划 100,000 次的。您实际上需要使用这些计划进行大量运行。我们跨应用程序调用缓存规划结果。最终应用程序需要更多地考虑要放置该文件的位置。例如,在群集上,它应该位于主目录中,而不是某个计算节点的临时目录中。
整个互斥锁的东西与你编写的测试用例无关。仅当从多个线程中并行创建计划时,才需要这样做。您可以在代码的其他部分执行此操作,但此处不行。它不会损害测试用例,但也没有必要。
基准测试结果
如果我使用 128x128 大小的 FFT 和 100,000 次迭代运行它,则 16 个线程在第一次运行(有计划)时需要 19 秒,在以后的所有运行中需要 7 秒。一个线程最初需要 5.8 秒,缓存的智慧需要 4.6 秒。
如果我使用 1024x1024 大小的 FFT、16 线程、1000 次迭代运行它,我第一次得到 3 分钟的运行时间,然后第二次得到 5 秒;使用所有线程。只有一个线程,第一次运行需要 34 秒,所有其他运行需要 9 秒。
综上所述
您的 FFT 大小可能太小,无法有效利用多线程。通过使用fftw_plan_many_dft_r2c
使用一批独立的小型 FFT,或者从外部部分调用许多独立的单线程 FFT,您将获得更好的结果pragma omp parallel
评论
FFTW_PATIENT
if (!s_plan){.
FFTW_PATIENT