提问人:Bhanu Teja Pogiri 提问时间:11/16/2023 更新时间:11/16/2023 访问量:18
使用 MPI 的并行 KNN 实现,使用 3 个进程产生错误,使用 1 个进程工作正常
Parallel KNN Implementation using MPI giving errors using 3 processes and working fine with 1 process
问:
我正在使用 MPI 实现并行 KNN 算法。
我从 git 存储库使用的修改后的代码文件如下:
我在同一目录中有 X_train.csv、X_test.csv、y_train.csv y_test.csv 个文件,并且 X_train形状: (8157, 15) y_train形状:(8157,) X_test形状:(1024, 15) y_test形状:(1024,)
config.h :-
#ifndef CONFIG_H
#define CONFIG_H
#define NTRAIN 8157 // number of training examples
#define NTEST 1024 // number of testing examples
#define NFEATURES 15 // number of features (columns) in the training examples
#define NCLASSES 2 // number of classes/labels
#define X_TRAIN_PATH "./X_train.csv" // path to X train .csv file
#define Y_TRAIN_PATH "./y_train.csv" // path to y train .csv file
#define X_TEST_PATH "./X_test.csv" // path to X test .csv file
#define Y_TEST_PATH "./y_test.csv" // path to y test .csv file
#define K 32 // the hyperparameter K in KNN algorithm
#define TOPN 2 // Print the closest top N classes
// Array of all classes/label names
char class[NCLASSES][25] = {"0", "1"};
#endif
助手.h :-
#ifndef HELPER_H
#define HELPER_H
void checkFile(FILE *f)
{
if (f == NULL)
{
printf("Error while reading file\n");
exit(1);
}
}
int *getIntMat(int m, int n)
{
int *mat = NULL;
mat = (int *)calloc(m * n, sizeof(int));
return mat;
}
float *getFloatMat(int m, int n)
{
float *mat = NULL;
mat = (float *)calloc(m * n, sizeof(float));
return mat;
}
float getMax(float *x, int n)
{
int i;
float max = x[0];
int maxIndex = 0;
for (i = 0; i < n; i++)
{
if (x[i] >= max)
{
max = x[i];
maxIndex = i;
}
}
return (float)maxIndex;
}
#endif
** mergeSort.h :- **
#ifndef M_SORT
#define M_SORT
void merge(float arr[], int l, int m, int r, float *y)
{
int i, j, k;
int n1 = m - l + 1;
int n2 = r - m;
/* create temp arrays */
float *L = getFloatMat(n1, 1);
float *R = getFloatMat(n2, 1);
float *Ly = getFloatMat(n1, 1);
float *Ry = getFloatMat(n2, 1);
/* Copy data to temp arrays L[] and R[] */
for (i = 0; i < n1; i++)
{
L[i] = arr[l + i];
Ly[i] = y[l + i];
}
for (j = 0; j < n2; j++)
{
R[j] = arr[m + 1+ j];
Ry[j] = y[m + 1 + j];
}
/* Merge the temp arrays back into arr[l..r]*/
i = 0; // Initial index of first subarray
j = 0; // Initial index of second subarray
k = l; // Initial index of merged subarray
while (i < n1 && j < n2)
{
if (L[i] <= R[j])
{
arr[k] = L[i];
y[k] = Ly[i];
i++;
}
else
{
arr[k] = R[j];
y[k] = Ry[j];
j++;
}
k++;
}
/* Copy the remaining elements of L[], if there
are any */
while (i < n1)
{
arr[k] = L[i];
y[k] = Ly[i];
i++;
k++;
}
/* Copy the remaining elements of R[], if there
are any */
while (j < n2)
{
arr[k] = R[j];
y[k] = Ry[j];
j++;
k++;
}
free (L);
free (R);
free (Ly);
free (Ry);
}
/* l is for left index and r is right index of the
sub-array of arr to be sorted */
void mergeSort(float arr[], int l, int r, float *y)
{
if (l < r)
{
// Same as (l+r)/2, but avoids overflow for
// large l and h
int m = l+(r-l)/2;
// Sort first and second halves
mergeSort(arr, l, m, y);
mergeSort(arr, m+1, r, y);
merge(arr, l, m, r, y);
}
}
void printArray(float A[], int size)
{
int i;
for (i=0; i < size; i++)
printf("%f ", A[i]);
printf("\n");
}
#endif
最后,knnInMPI.c :-
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
#include <string.h>
#include "helper.h"
#include "mergeSort.h"
// config file, make changes here for different dataset
#include "config.h"
void mpiInitialise(int *size, int *rank)
{
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, rank);
MPI_Comm_size(MPI_COMM_WORLD, size);
}
float *initFeatures(char path[])
{
int index = 0;
FILE *f = NULL;
float *mat = NULL;
mat = getFloatMat(NTRAIN, NFEATURES);
f = fopen(path, "r");
checkFile(f);
while (fscanf(f, "%f%*c", &mat[index]) == 1) //%*c ignores the comma while reading the CSV
index++;
fclose(f);
return mat;
}
float *initLabels(char path[])
{
int index = 0;
FILE *f = NULL;
float *mat = NULL;
mat = getFloatMat(NTRAIN, 1);
f = fopen(path, "r");
checkFile(f);
while (fscanf(f, "%f%*c", &mat[index]) == 1)
index++;
fclose(f);
return mat;
}
// Function to calculate Euclidean distance between two points
float calculateDistance(float *point1, float *point2, int features)
{
float distance = 0.0;
for (int i = 0; i < features; i++)
{
distance += (point1[i] - point2[i]) * (point1[i] - point2[i]);
}
return sqrt(distance);
}
// Function to perform majority voting and return the majority class label
int majorityVote(float *labels, int size)
{
int *classCounts = (int *)calloc(NCLASSES, sizeof(int));
for (int i = 0; i < size; i++)
{
int label = (int)labels[i];
classCounts[label]++;
}
int maxCount = 0;
int majorityClass = -1;
for (int i = 0; i < NCLASSES; i++)
{
if (classCounts[i] > maxCount)
{
maxCount = classCounts[i];
majorityClass = i;
}
}
free(classCounts);
return majorityClass;
}
void knn(char *X_train_path, char *y_train_path, char *X_test_path, char *y_test_path)
{
printf("1");
float *X_train;
float *y_train;
float *X_test;
float *y_test;
double t1, t2;
int size, rank;
printf("Started");
mpiInitialise(&size, &rank);
if (rank == 0)
{
X_train = initFeatures(X_train_path);
y_train = initLabels(y_train_path);
}
X_test = initFeatures(X_test_path);
y_test = initLabels(y_test_path);
printf("Datasets loaded");
if (rank == 0)
t1 = MPI_Wtime();
int nrows_per_process = NTRAIN / size;
int ndata_per_process = nrows_per_process * NFEATURES;
float *pdata = getFloatMat(ndata_per_process, 1);
float *pdistance = getFloatMat(nrows_per_process, 1);
float *plabels = getFloatMat(nrows_per_process, 1);
int *assignedLabels = (int *)malloc(NTEST * sizeof(int));
MPI_Scatter(X_train, ndata_per_process, MPI_FLOAT, pdata, ndata_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);
for (int i = 0; i < NTEST; i++)
{
MPI_Scatter(y_train, nrows_per_process, MPI_FLOAT, plabels, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);
float *x = getFloatMat(NFEATURES, 1);
for (int j = 0; j < NFEATURES; j++)
x[j] = X_test[i * NFEATURES + j];
// Calculate distance
for (int j = 0; j < nrows_per_process; j++)
{
pdistance[j] = calculateDistance(&pdata[j * NFEATURES], x, NFEATURES);
}
// Sort the distance array
mergeSort(pdistance, 0, nrows_per_process - 1, plabels);
MPI_Gather(pdistance, nrows_per_process, MPI_FLOAT, pdistance, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);
MPI_Gather(plabels, nrows_per_process, MPI_FLOAT, plabels, nrows_per_process, MPI_FLOAT, 0, MPI_COMM_WORLD);
if (rank == 0)
{
// Sort the global distance array
mergeSort(pdistance, 0, NTRAIN - 1, plabels);
// Perform majority voting and get the assigned label
assignedLabels[i] = majorityVote(plabels, K);
printf("%d) Predicted label: %d True label: %d\n\n", i, assignedLabels[i], (int)y_test[i]);
}
free(x);
}
free(pdata);
free(pdistance);
free(plabels);
int *allAssignedLabels = NULL;
if (rank == 0)
{
printf("reached here 2");
allAssignedLabels = (int *)malloc(NTEST * sizeof(int));
}
// Gather assigned labels from all processes to process 0
MPI_Gather(assignedLabels, NTEST, MPI_INT, allAssignedLabels, NTEST, MPI_INT, 0, MPI_COMM_WORLD);
if (rank == 0)
{
// Evaluate the classifier by the accuracy measurement on Dtest
int correctCount = 0;
for (int i = 0; i < NTEST; i++)
{
if (allAssignedLabels[i] == (int)y_test[i])
{
correctCount++;
}
}
double accuracy = (double)correctCount / NTEST;
printf("Accuracy: %f\n", accuracy);
free(allAssignedLabels);
}
if (rank == 0)
t2 = MPI_Wtime();
if (rank == 0)
{
printf("Time for execution (%d Processors): %f\n", size, t2 - t1);
// free(X_train);
// free(y_train);
}
// free(X_test);
// free(y_test);
// free(assignedLabels);
MPI_Finalize();
}
int main()
{
knn(X_TRAIN_PATH, Y_TRAIN_PATH, X_TEST_PATH, Y_TEST_PATH);
return 0;
}
我使用了这些命令:
'mpicc -o knnInMPI knnInMPI.c -lm',然后 'mpirun -np 3 ./knnInMPI'。
当我运行此命令: 'mpirun -np 1 ./knnInMPI'时,我没有收到任何错误,并且代码按预期执行,但是当我运行时,将 1 替换为 3 : 'mpirun -np 3 ./knnInMPI' 我收到此错误:
malloc(): corrupted top size
[bhanu-VirtualBox:02272] *** Process received signal ***
[bhanu-VirtualBox:02272] Signal: Aborted (6)
[bhanu-VirtualBox:02272] Signal code: (-6)
我哪里出错了?有人可以帮我写代码吗?
谢谢
答: 暂无答案
评论