使用 MPI 的并行 KNN 实现,使用 3 个进程产生错误,使用 1 个进程工作正常

Parallel KNN Implementation using MPI giving errors using 3 processes and working fine with 1 process

提问人:Bhanu Teja Pogiri 提问时间:11/16/2023 更新时间:11/16/2023 访问量:18

问:

我正在使用 MPI 实现并行 KNN 算法。

我从 git 存储库使用的修改后的代码文件如下:

我在同一目录中有 X_train.csv、X_test.csv、y_train.csv y_test.csv 个文件,并且 X_train形状: (8157, 15) y_train形状:(8157,) X_test形状:(1024, 15) y_test形状:(1024,)

config.h :-

#ifndef CONFIG_H
#define CONFIG_H

#define NTRAIN 8157                     // number of training examples
#define NTEST 1024                      // number of testing examples 
#define NFEATURES 15                     // number of features (columns) in the training examples
#define NCLASSES 2                      // number of classes/labels

#define X_TRAIN_PATH "./X_train.csv"    // path to X train .csv file
#define Y_TRAIN_PATH "./y_train.csv"    // path to y train .csv file
#define X_TEST_PATH "./X_test.csv"      // path to X test .csv file
#define Y_TEST_PATH "./y_test.csv"      // path to y test .csv file

#define K 32                             // the hyperparameter K in KNN algorithm
#define TOPN 2                           // Print the closest top N classes

// Array of all classes/label names 
char class[NCLASSES][25] = {"0", "1"};

#endif

助手.h :-


#ifndef HELPER_H
#define HELPER_H

void checkFile(FILE *f)
{
    if (f == NULL)
    {
        printf("Error while reading file\n");
        exit(1);
    }
}

int *getIntMat(int m, int n)
{
    int *mat = NULL;
    mat = (int *)calloc(m * n, sizeof(int));

    return mat;
}

float *getFloatMat(int m, int n)
{
    float *mat = NULL;
    mat = (float *)calloc(m * n, sizeof(float));

    return mat;
}

float getMax(float *x, int n)
{
    int i;
    float max = x[0];
    int maxIndex = 0;

    for (i = 0; i < n; i++)
    {
        if (x[i] >= max)
        {
            max = x[i];
            maxIndex = i;
        }
    }

    return (float)maxIndex;
}

#endif

** mergeSort.h :- **

#ifndef M_SORT
#define M_SORT

void merge(float arr[], int l, int m, int r, float *y) 
{ 
    int i, j, k; 
    int n1 = m - l + 1; 
    int n2 =  r - m; 
  
    /* create temp arrays */
    float *L = getFloatMat(n1, 1);
    float *R = getFloatMat(n2, 1); 

    float *Ly = getFloatMat(n1, 1);
    float *Ry = getFloatMat(n2, 1);
  
    /* Copy data to temp arrays L[] and R[] */
    for (i = 0; i < n1; i++)
    {
        L[i] = arr[l + i];
        Ly[i] = y[l + i];
    }

    for (j = 0; j < n2; j++)
    {
        R[j] = arr[m + 1+ j];
        Ry[j] = y[m + 1 + j];
    } 
  
    /* Merge the temp arrays back into arr[l..r]*/
    i = 0; // Initial index of first subarray 
    j = 0; // Initial index of second subarray 
    k = l; // Initial index of merged subarray 
    while (i < n1 && j < n2) 
    { 
        if (L[i] <= R[j]) 
        { 
            arr[k] = L[i];
            y[k] = Ly[i];
            i++; 
        } 

        else
        { 
            arr[k] = R[j]; 
            y[k] = Ry[j];

            j++; 
        }

        k++; 
    } 
  
    /* Copy the remaining elements of L[], if there 
       are any */
    while (i < n1) 
    { 
        arr[k] = L[i];
        y[k] = Ly[i];

        i++; 
        k++; 
    } 
  
    /* Copy the remaining elements of R[], if there 
       are any */
    while (j < n2) 
    { 
        arr[k] = R[j]; 
        y[k] = Ry[j];

        j++; 
        k++; 
    } 

    free (L);
    free (R);

    free (Ly);
    free (Ry);
} 
  
/* l is for left index and r is right index of the 
   sub-array of arr to be sorted */

void mergeSort(float arr[], int l, int r, float *y) 
{ 
    if (l < r) 
    { 
        // Same as (l+r)/2, but avoids overflow for 
        // large l and h 
        int m = l+(r-l)/2; 
  
        // Sort first and second halves 
        mergeSort(arr, l, m, y); 
        mergeSort(arr, m+1, r, y); 
  
        merge(arr, l, m, r, y); 
    } 
} 

void printArray(float A[], int size) 
{ 
    int i; 
    for (i=0; i < size; i++) 
        printf("%f ", A[i]); 
    printf("\n"); 
} 

#endif

最后,knnInMPI.c :-

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include <string.h>
#include "helper.h"
#include "mergeSort.h"

// config file, make changes here for different dataset
#include "config.h"

void mpiInitialise(int *size, int *rank)
{
    MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, rank);
    MPI_Comm_size(MPI_COMM_WORLD, size);
}

float *initFeatures(char path[])
{
    int index = 0;
    FILE *f  = NULL;
    float *mat = NULL;

    mat = getFloatMat(NTRAIN, NFEATURES);

    f = fopen(path, "r");
    checkFile(f);

    while (fscanf(f, "%f%*c", &mat[index]) == 1) //%*c ignores the comma while reading the CSV
        index++;

    fclose(f);
    return mat;
}

float *initLabels(char path[])
{
    int index = 0;
    FILE *f  = NULL;
    float *mat = NULL;

    mat = getFloatMat(NTRAIN, 1);

    f = fopen(path, "r");
    checkFile(f);

    while (fscanf(f, "%f%*c", &mat[index]) == 1)
        index++;

    fclose(f);
    return mat;
}

// Function to calculate Euclidean distance between two points
float calculateDistance(float *point1, float *point2, int features)
{
    float distance = 0.0;
    for (int i = 0; i < features; i++)
    {
        distance += (point1[i] - point2[i]) * (point1[i] - point2[i]);
    }
    return sqrt(distance);
}

// Function to perform majority voting and return the majority class label
int majorityVote(float *labels, int size)
{
    int *classCounts = (int *)calloc(NCLASSES, sizeof(int));

    for (int i = 0; i < size; i++)
    {
        int label = (int)labels[i];
        classCounts[label]++;
    }

    int maxCount = 0;
    int majorityClass = -1;

    for (int i = 0; i < NCLASSES; i++)
    {
        if (classCounts[i] > maxCount)
        {
            maxCount = classCounts[i];
            majorityClass = i;
        }
    }

    free(classCounts);
    return majorityClass;
}

void knn(char *X_train_path, char *y_train_path, char *X_test_path, char *y_test_path)
{
    printf("1");
    float *X_train;
    float *y_train;
    float *X_test;
    float *y_test;
    double t1, t2;
    int size, rank;
    printf("Started");
    mpiInitialise(&size, &rank);

    if (rank == 0)
    {
        X_train = initFeatures(X_train_path);
        y_train = initLabels(y_train_path);
    }

    X_test = initFeatures(X_test_path);
    y_test = initLabels(y_test_path);

    printf("Datasets loaded");

    if (rank == 0)
        t1 = MPI_Wtime();

    int nrows_per_process = NTRAIN / size;
    int ndata_per_process = nrows_per_process * NFEATURES;

    float *pdata = getFloatMat(ndata_per_process, 1);
    float *pdistance = getFloatMat(nrows_per_process, 1);
    float *plabels = getFloatMat(nrows_per_process, 1);

    int *assignedLabels = (int *)malloc(NTEST * sizeof(int));

    MPI_Scatter(X_train, ndata_per_process, MPI_FLOAT, pdata, ndata_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);

    for (int i = 0; i < NTEST; i++)
    {
        MPI_Scatter(y_train, nrows_per_process, MPI_FLOAT, plabels, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);

        float *x = getFloatMat(NFEATURES, 1);

        for (int j = 0; j < NFEATURES; j++)
            x[j] = X_test[i * NFEATURES + j];

        // Calculate distance
        for (int j = 0; j < nrows_per_process; j++)
        {
            pdistance[j] = calculateDistance(&pdata[j * NFEATURES], x, NFEATURES);
        }

        // Sort the distance array
        mergeSort(pdistance, 0, nrows_per_process - 1, plabels);

        MPI_Gather(pdistance, nrows_per_process, MPI_FLOAT, pdistance, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);
        MPI_Gather(plabels, nrows_per_process, MPI_FLOAT, plabels, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);

        if (rank == 0)
        {
            // Sort the global distance array
            mergeSort(pdistance, 0, NTRAIN - 1, plabels);

            // Perform majority voting and get the assigned label
            assignedLabels[i] = majorityVote(plabels, K);
            printf("%d) Predicted label: %d   True label: %d\n\n", i, assignedLabels[i], (int)y_test[i]);
        }

        free(x);
    }

    free(pdata);
    free(pdistance);
    free(plabels);

    int *allAssignedLabels = NULL;

    if (rank == 0)
    {
        printf("reached here 2");
        allAssignedLabels = (int *)malloc(NTEST * sizeof(int));
    }

    // Gather assigned labels from all processes to process 0
    MPI_Gather(assignedLabels, NTEST, MPI_INT, allAssignedLabels, NTEST, MPI_INT, 0, MPI_COMM_WORLD);

    if (rank == 0)
    {
        // Evaluate the classifier by the accuracy measurement on Dtest
        int correctCount = 0;
        for (int i = 0; i < NTEST; i++)
        {
            if (allAssignedLabels[i] == (int)y_test[i])
            {
                correctCount++;
            }
        }

        double accuracy = (double)correctCount / NTEST;
        printf("Accuracy: %f\n", accuracy);

        free(allAssignedLabels);
    }

    if (rank == 0)
        t2 = MPI_Wtime();

    if (rank == 0)
    {
        printf("Time for execution (%d Processors): %f\n", size, t2 - t1);
//        free(X_train);
//        free(y_train);
    }

//    free(X_test);
//    free(y_test);
//    free(assignedLabels);
    MPI_Finalize();
}

int main()
{
    knn(X_TRAIN_PATH, Y_TRAIN_PATH, X_TEST_PATH, Y_TEST_PATH);
    return 0;
}

我使用了这些命令:

'mpicc -o knnInMPI knnInMPI.c -lm',然后 'mpirun -np 3 ./knnInMPI'。

当我运行此命令: 'mpirun -np 1 ./knnInMPI'时,我没有收到任何错误,并且代码按预期执行,但是当我运行时,将 1 替换为 3 : 'mpirun -np 3 ./knnInMPI' 我收到此错误:

malloc(): corrupted top size
[bhanu-VirtualBox:02272] *** Process received signal ***
[bhanu-VirtualBox:02272] Signal: Aborted (6)
[bhanu-VirtualBox:02272] Signal code:  (-6)

我哪里出错了?有人可以帮我写代码吗?

谢谢

c 并行处理 malloc mpi knn

评论


答: 暂无答案