如何在 C/C++ 中启用带有多线程 FFTW 的 OpenMP?

How to enable OpenMP with Multi-threaded FFTW in C/C++?

提问人:Jamie 提问时间:9/20/2023 更新时间:9/21/2023 访问量:75

问:

当我在他们的文档中读到您应该创建一次所有 fftw 计划并多次执行时,我正在努力在 C/C++ 中缓慢实现我的 FFTW,我能够正确实现。现在,我正在尝试将内置的 openmp 功能与并行 FFTW 一起使用。我跟着他们关于 fftw thread-saftey 的文档:

The only thread-safe routine in FFTW is fftw_execute.
All other routines (e.g. the planner) should only be called from one thread at a time.
So, for example, you can wrap a semaphore lock around any calls to the planner.

这就是我使用但是,现在的问题是我不确定如何或在哪里调用以下内容:std::lock_guard<std::mutex> lock(...

Third, before creating a plan that you want to parallelize, you should call:

void fftw_plan_with_nthreads(int nthreads);

我现在拥有的任何示例代码似乎都不起作用或被“并行化”!

代码示例:

#include"omp.h"  
#include <mutex>
#include <thread>
#include "fftw3.h"

static const int nx = 128;
static const int ny = 128;
static const int ncomp = 2;
static const int nyk = ny/2 + 1;

class MyFftwClass {
public:
    MyFftwClass(void);
    ~MyFftwClass(void);
    void execute(double rArr[], double cArr[][ncomp]);

private:
    static fftw_plan s_plan; // <-- shared by all instances
    double *m_buffer_in;
    fftw_complex *m_buffer_out;
};


class MyFftwClass1 {
public:
    MyFftwClass1(void);
    ~MyFftwClass1(void);
    void execute1(double cArr[][ncomp],double rArr[]);

private:
    static fftw_plan s_plan1; // <-- shared by all instances
    fftw_complex *m_buffer_in1;
    double *m_buffer_out1;
};

int main(){ 
    
fftw_init_threads(); //before calling any FFTW routines, you should call the function
//This function, which should only be called once (probably in your main() function), performs any one-time initialization required to use threads on your system.
int nThreads =1;//4;
omp_set_num_threads(nThreads);

double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double)); 
    for (int i = 0; i < nx; i++){
        for (int j = 0; j < ny; j++){
          Function[j + ny*i]  = //some initialization; 
            
        }
    }

fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex)); 
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));

MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
r2c1.execute(Function,Functionk);

double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double)); 
memset(Out, 42, nx*ny* sizeof(double));

MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
c2r1.execute1(Functionk,Out);
//fftw_free stuff
}
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass1::MyFftwClass1(void)
{
    // serialize the initialization!
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);

    // allocate separate buffers for each instance
    m_buffer_in1 = fftw_alloc_complex(nx * nyk);
    m_buffer_out1 = fftw_alloc_real(nx * ny); 
    if (!(m_buffer_in1 && m_buffer_out1))
    {
        throw new std::runtime_error("Failed to allocate memory!");
    }

    // create plan *once* for all instances/threads
    if (!s_plan1)
    {
        s_plan1 = fftw_plan_dft_c2r_2d(nx, ny, m_buffer_in1, m_buffer_out1, FFTW_PATIENT);
        if (!s_plan1)
        {
            throw new std::runtime_error("Failed to create plan!");
        }
    }

}
MyFftwClass1::~MyFftwClass1(void)
{
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
    fftw_destroy_plan(s_plan1);
    fftw_free(m_buffer_in1);
    fftw_free(m_buffer_out1);
}

void MyFftwClass1::execute1(double cArr[][ncomp],double rArr[]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
    // No serialization is needed here
    memcpy(m_buffer_in1,cArr, sizeof(fftw_complex) * nx*(nyk));
    fftw_execute_dft_c2r(s_plan1, m_buffer_in1, m_buffer_out1); //instead of fftw_excute(plan)
    memcpy(rArr, m_buffer_out1,sizeof(double) * nx*ny);
    //(rArr, 1.0 / (nx*ny), rArr); //renormalize 
}

fftw_plan MyFftwClass1::s_plan1 = NULL;

std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass::MyFftwClass(void)
//int r2cfft_initialize(r2cfft_t *const ctx, const std::size_t nx, const std::size_t ny)
{
    // serialize the initialization!
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);

    //allocating buffers first before plan creating gets rid off seg fault error

    // allocate separate buffers for each instance
    m_buffer_in = fftw_alloc_real(nx * ny); 
    m_buffer_out = fftw_alloc_complex(nx * nyk);
    if (!(m_buffer_in && m_buffer_out))
    {
        throw new std::runtime_error("Failed to allocate memory!");
    }

    // create plan *once* for all instances/threads
   if (!s_plan)
    {
        s_plan = fftw_plan_dft_r2c_2d(nx, ny, m_buffer_in, m_buffer_out, FFTW_PATIENT);
        if (!s_plan)
        {
            throw new std::runtime_error("Failed to create plan!");
        }
    }

    
}

MyFftwClass::~MyFftwClass(void)
{
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
    fftw_free(m_buffer_in);
    fftw_free(m_buffer_out);
    fftw_destroy_plan(s_plan);
}

void MyFftwClass::execute(double rArr[], double cArr[][ncomp]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
    // No serialization is needed here
    memcpy(m_buffer_in, rArr,  sizeof(double) * nx*ny);
    fftw_execute_dft_r2c(s_plan, m_buffer_in, m_buffer_out); //instead of fftw_excute(plan)
    memcpy(cArr, m_buffer_out, sizeof(fftw_complex) * nx*(nyk));
}

fftw_plan MyFftwClass::s_plan = NULL;

这看起来是线程安全的和/或并行的吗?谢谢!

C++ 多线程 OpenMP FFTW

评论

1赞 Homer512 9/20/2023
对不起,我误读了,因此删除了我的评论。您通常在创建计划之前直接调用它,以便该位置是正确的。您看到速度放缓的原因是您计划使用 .通过激活多线程,这将测试更多的组合以找到最佳计划。将结果缓存在 fftw 智慧中可能是个好主意FFTW_PATIENT
2赞 Homer512 9/20/2023
另一个问题是,你甚至需要那个互斥锁吗?您发布的代码没有任何自己的线程。它只是希望 fftw 在内部使用线程。仅当您自己的代码同时从多个线程调用 FFTW 时,才需要互斥锁。
3赞 Homer512 9/20/2023
还有其他几个问题。例如,你的类的析构函数破坏了他们的计划。但这些计划是静态的。因此,如果有两个相同类的实例,则 a) 将同一计划销毁两次(双重释放错误),并且 b) 可能会销毁仍在使用中的计划
0赞 Jamie 9/21/2023
@Homer512 “你通常在创建计划之前直接调用它,以便该位置是正确的”,那么在if语句之前,我称之为??另外,我对“fftw wisdom”不是很熟悉,我可以用它来代替吗?if (!s_plan){.FFTW_PATIENT
1赞 PierU 9/21/2023
您的代码不会创建任何并行区域,因此不会自行生成线程。这就是@Homer512的意思。线程可能是在 fftw 函数中的较低级别创建和管理的,因此您通常不必担心管理它们。

答:

1赞 Homer512 9/21/2023 #1

两件事:

  1. 您计划使用 .这是非常昂贵的,对于线程来说更是如此。你需要经常重用计划,做很多很多的转换;或者您需要节省计划的智慧,以便成本只发生一次FFTW_PATIENT

  2. 您的 FFT 相当小。可能无法并行化它

最小修复

这是一个最小修改的版本。我认为还有很多问题已经过时了,或者可以做得更好。但是,让我们首先关注这一点:

int main(){

    fftw_init_threads();
    fftw_import_system_wisdom();
    fftw_import_wisdom_from_filename("/var/tmp/my_application_fftw.wisdom");
    int nThreads =1;//4;
    omp_set_num_threads(nThreads);
    fftw_plan_with_nthreads(nThreads);


    double *Function;
    Function= (double*) fftw_malloc(nx*ny*sizeof(double));
    for (int i = 0; i < nx; i++){
        for (int j = 0; j < ny; j++){
            Function[j + ny*i]  = 1.;//some initialization;
        }
    }

    fftw_complex *Functionk;
    Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
    memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));

    double *Out;
    Out = (double*) fftw_malloc(nx*ny*sizeof(double));
    memset(Out, 42, nx*ny* sizeof(double));
    MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
    MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
    for(int i = 0; i < 100000; ++i) {
        r2c1.execute(Function,Functionk);
        c2r1.execute1(Functionk,Out);
    }
    fftw_export_wisdom_to_filename("/var/tmp/my_application_fftw.wisdom");
    //fftw_free stuff
}

编译方式(或者如果不可用)g++ -O3 -lfftw -lfftw_omp -fopenmp-lfftw_threadsfftw_omp

请注意,我是如何重复使用相同的两个计划 100,000 次的。您实际上需要使用这些计划进行大量运行。我们跨应用程序调用缓存规划结果。最终应用程序需要更多地考虑要放置该文件的位置。例如,在群集上,它应该位于主目录中,而不是某个计算节点的临时目录中。

整个互斥锁的东西与你编写的测试用例无关。仅当从多个线程中并行创建计划时,才需要这样做。您可以在代码的其他部分执行此操作,但此处不行。它不会损害测试用例,但也没有必要。

基准测试结果

如果我使用 128x128 大小的 FFT 和 100,000 次迭代运行它,则 16 个线程在第一次运行(有计划)时需要 19 秒,在以后的所有运行中需要 7 秒。一个线程最初需要 5.8 秒,缓存的智慧需要 4.6 秒。

如果我使用 1024x1024 大小的 FFT、16 线程、1000 次迭代运行它,我第一次得到 3 分钟的运行时间,然后第二次得到 5 秒;使用所有线程。只有一个线程,第一次运行需要 34 秒,所有其他运行需要 9 秒。

综上所述

您的 FFT 大小可能太小,无法有效利用多线程。通过使用fftw_plan_many_dft_r2c使用一批独立的小型 FFT,或者从外部部分调用许多独立的单线程 FFT,您将获得更好的结果pragma omp parallel

评论

0赞 Jamie 9/22/2023
谢谢!这非常有帮助,我会说我一直在为单次迭代测试它,我的代码在技术上运行了数千次迭代的 10 次,我猜我的初始测试不是要走的路。正因为如此,我认为我现在更倾向于使用多线程,并更严格地测试我的代码。