欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MPI并行编程: 矩阵乘法,积分计算,PSPR排序

程序员文章站 2022-03-15 19:19:02
...

简介

MPI(Message-Passing-Interface 消息传递接口)实现并行是进程级别的,通过通信在进程之间进行消息传递。MPI并不是一种新的开发语言,它是一个定义了可以被C、C++和Fortran程序调用的函数库。这些函数库里面主要涉及的是两个进程之间通信的函数。

MPI是一个跨语言的通讯协议,用于编写并行计算机。支持点对点和广播。MPI是一个信息传递应用程序接口,包括协议和和语义说明,他们指明其如何在各种实现中发挥其特性。MPI的目标是高性能,大规模性,和可移植性。MPI在今天仍为高性能计算的主要模型。

主要的MPI-1模型不包括共享内存概念,MPI-2只有有限的分布共享内存概念。 但是MPI程序经常在共享内存的机器上运行。在MPI模型周边设计程序比在NUMA架构下设计要好因为MPI鼓励内存本地化。

尽管MPI属于OSI参考模型的第五层或者更高,他的实现可能通过传输层的sockets和Transmission Control Protocol (TCP)覆盖大部分的层。大部分的MPI实现由一些指定惯例集(API)组成,可由C,C++,Fortran,或者有此类库的语言比如C#, Java or Python直接调用。MPI优于老式信息传递库是因为他的可移植性和速度。

与OpenMP并行程序不同,MPI是一种基于信息传递的并行编程技术。消息传递接口是一种编程接口标准,而不是一种具体的编程语言。简而言之,MPI标准定义了一组具有可移植性的编程接口 。

基本使用

MPI需要在官网上下载安装包进行安装,在linux上,再通过mpi编译器编译:

mpicxx psrs.cpp -o psrs

运行:

mpirun -np 2 ./psrs  // 2代表进程数

关键语法

关键语法可参考链接, 介绍得很详细:

https://computing.llnl.gov/tutorials/mpi/

主要API:

MPI并行编程: 矩阵乘法,积分计算,PSPR排序

编程实战

积分计算

double Trap(double a, double b, int n, double(*f)(double)){

    double integral, h;
    h = (b-a)/n;

    int rank, size;

    MPI_Comm_size(MPI_COMM_WORLD,&size);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    int local_n;
    double local_a, local_b, local_sum, total_sum;

    local_n = n/size; // n必须为size的整数倍
    local_a = a + rank*local_n*h;
    local_b = local_a + local_n*h;

    for (int k = 0; k <= local_n - 1; ++k) {
        local_sum += f(local_a + k*h);
    }

    local_sum *= h;
//    printf("process %d : local_sum = %fl", rank, local_sum);

    MPI_Reduce(&local_sum, &total_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

//    if(rank == 0)
    return total_sum;

}

矩阵乘向量

void Mat_vect_mult(
                    int A[],
                    int x[],
                    int y[],
                    const int &m,
                    const int &n
        ){
    int i, j;
    for (i = 0; i < m; ++i) {
        y[i] = 0;
        for (j = 0; j < n; ++j) {
            y[i] += A[i*n + j]*x[j];
        }
    }
}

void Solve(){

    int rank, size;

    MPI_Comm_size(MPI_COMM_WORLD,&size);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    int m = M, n = N;
    int *A, *y;

    if(rank == 0){
        int fin = open("testData", O_RDONLY | O_CREAT);
        A = (int *)malloc(sizeof(int) * m * n);
        read(fin, A, m * n *sizeof(int));
    }

    int *subMat = (int *)malloc(sizeof(int) * m * n / size);
    int *local_y = (int*)malloc(m/size * sizeof(int));
    int *x = (int *)malloc(sizeof(int) * n);
    for (int i = 0; i < n; ++i) {
        x[i] = 1;
    }

    // 把矩阵各个列散射
    MPI_Scatter(A, m/size*n, MPI_INT, subMat, m/size*n, MPI_INT, 0, MPI_COMM_WORLD);


    Mat_vect_mult(subMat, x, local_y, m/size, n);

    if(rank == 0){
        y = (int *)malloc(sizeof(int) * m);
    }

    MPI_Gather(local_y, M/size, MPI_INT, y, M/size, MPI_INT, 0, MPI_COMM_WORLD);

//    if(rank == 0){
//        printf("Final Result:[");
//        int i;
//        for(i = 0 ; i < M-1 ; i++)
//            printf("%d,",y[i]);
//        printf("%d]\n",y[i]);
//    }

    free(local_y);
    free(subMat);
    free(x);
    if(rank == 0){
        free(y);
        free(A);
    }

    return ;
}

PSRS排序

//
// Created by wesley on 18-6-23.
//
#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <limits.h>
#include <assert.h>
#include <sys/time.h>
#include <unistd.h>
#include "mpi.h"
#include <fcntl.h>
#include <time.h>
#include <benchmark/benchmark.h>
int i,j,k;

// 运行方式: mpicxx psrs.cpp -o psrs -lbenchmark -lpthread -fopenmp && mpirun -np 2 ./psrs


int N = 1024*1024*1024/4;

int *array;

int cmp(const void * a, const void * b) {
    if (*(int*)a < *(int*)b) return -1;
    if (*(int*)a > *(int*)b) return 1;
    else return 0;
}

void phase1(int *array, int N, int startIndex, int subArraySize, int *pivots, int p) {
    // 对子数组进行局部排序
    qsort(array + startIndex, subArraySize, sizeof(array[0]), cmp);

    // 正则采样
    for (i = 0; i < p; i++) {
        pivots[i] = array[startIndex + (i * (N / (p * p)))];
    }
    return;
}

void phase2(int *array, int startIndex, int subArraySize, int *pivots, int *partitionSizes, int p, int myId) {
    int *collectedPivots = (int *) malloc(p * p * sizeof(pivots[0]));
    int *phase2Pivots = (int *) malloc((p - 1) * sizeof(pivots[0]));          //主元
    int index = 0;

    //收集消息,根进程在它的接受缓冲区中包含所有进程的发送缓冲区的连接。
    MPI_Gather(pivots, p, MPI_INT, collectedPivots, p, MPI_INT, 0, MPI_COMM_WORLD);
    if (myId == 0) {

        qsort(collectedPivots, p * p, sizeof(pivots[0]), cmp);          //对正则采样的样本进行排序

        // 采样排序后进行主元的选择
        for (i = 0; i < (p -1); i++) {
            phase2Pivots[i] = collectedPivots[(((i+1) * p) + (p / 2)) - 1];
        }
    }
    //发送广播
    MPI_Bcast(phase2Pivots, p - 1, MPI_INT, 0, MPI_COMM_WORLD);
    // 进行主元划分,并计算划分部分的大小
    for ( i = 0; i < subArraySize; i++) {
        if (array[startIndex + i] > phase2Pivots[index]) {
            //如果当前位置的数字大小超过主元位置,则进行下一个划分
            index += 1;
        }
        if (index == p) {
            //最后一次划分,子数组总长减掉当前位置即可得到最后一个子数组划分的大小
            partitionSizes[p - 1] = subArraySize - i + 1;
            break;
        }
        partitionSizes[index]++ ;   //划分大小自增
    }
    free(collectedPivots);
    free(phase2Pivots);
    return;
}

void phase3(int *array, int startIndex, int *partitionSizes, int **newPartitions, int *newPartitionSizes, int p) {
    int totalSize = 0;
    int *sendDisp = (int *) malloc(p * sizeof(int));
    int *recvDisp = (int *) malloc(p * sizeof(int));

    // 全局到全局的发送,每个进程可以向每个接收者发送数目不同的数据.
    MPI_Alltoall(partitionSizes, 1, MPI_INT, newPartitionSizes, 1, MPI_INT, MPI_COMM_WORLD);

    // 计算划分的总大小,并给新划分分配空间
    for ( i = 0; i < p; i++) {
        totalSize += newPartitionSizes[i];
    }
    *newPartitions = (int *) malloc(totalSize * sizeof(int));

    // 在发送划分之前计算相对于sendbuf的位移,此位移处存放着输出到进程的数据
    sendDisp[0] = 0;
    recvDisp[0] = 0;      //计算相对于recvbuf的位移,此位移处存放着从进程接受到的数据
    for ( i = 1; i < p; i++) {
        sendDisp[i] = partitionSizes[i - 1] + sendDisp[i - 1];
        recvDisp[i] = newPartitionSizes[i - 1] + recvDisp[i - 1];
    }

    //发送数据,实现n次点对点通信
    MPI_Alltoallv(&(array[startIndex]), partitionSizes, sendDisp, MPI_INT, *newPartitions, newPartitionSizes, recvDisp, MPI_INT, MPI_COMM_WORLD);

    free(sendDisp);
    free(recvDisp);
    return;
}

void phase4(int *partitions, int *partitionSizes, int p, int myId, int *array) {
    int *sortedSubList;
    int *recvDisp, *indexes, *partitionEnds, *subListSizes, totalListSize;

    indexes = (int *) malloc(p * sizeof(int));
    partitionEnds = (int *) malloc(p * sizeof(int));
    indexes[0] = 0;
    totalListSize = partitionSizes[0];
    for ( i = 1; i < p; i++) {
        totalListSize += partitionSizes[i];
        indexes[i] = indexes[i-1] + partitionSizes[i-1];
        partitionEnds[i-1] = indexes[i];
    }
    partitionEnds[p - 1] = totalListSize;

    sortedSubList = (int *) malloc(totalListSize * sizeof(int));
    subListSizes = (int *) malloc(p * sizeof(int));
    recvDisp = (int *) malloc(p * sizeof(int));

    // 归并排序
    for ( i = 0; i < totalListSize; i++) {
        int lowest = INT_MAX;
        int ind = -1;
        for (j = 0; j < p; j++) {
            if ((indexes[j] < partitionEnds[j]) && (partitions[indexes[j]] < lowest)) {
                lowest = partitions[indexes[j]];
                ind = j;
            }
        }
        sortedSubList[i] = lowest;
        indexes[ind] += 1;
    }

    // 发送各子列表的大小回根进程中
    MPI_Gather(&totalListSize, 1, MPI_INT, subListSizes, 1, MPI_INT, 0, MPI_COMM_WORLD);

    // 计算根进程上的相对于recvbuf的偏移量
    if (myId == 0) {
        recvDisp[0] = 0;
        for ( i = 1; i < p; i++) {
            recvDisp[i] = subListSizes[i - 1] + recvDisp[i - 1];
        }
    }

    //发送各排好序的子列表回根进程中
    MPI_Gatherv(sortedSubList, totalListSize, MPI_INT, array, subListSizes, recvDisp, MPI_INT, 0, MPI_COMM_WORLD);

    free(partitionEnds);
    free(sortedSubList);
    free(indexes);
    free(subListSizes);
    free(recvDisp);
    return;
}

//PSRS排序函数,调用了4个过程函数
void psrs_mpi(int *array, int N)
{
    int p, myId, *partitionSizes, *newPartitionSizes, nameLength;
    int subArraySize, startIndex, endIndex, *pivots, *newPartitions;
    char processorName[MPI_MAX_PROCESSOR_NAME];


    MPI_Comm_size(MPI_COMM_WORLD,&p);
    MPI_Comm_rank(MPI_COMM_WORLD,&myId);
    MPI_Get_processor_name(processorName,&nameLength);

//    printf("Process %d is on %s\n",myId, processorName);

    pivots = (int *) malloc(p*sizeof(int));
    partitionSizes = (int *) malloc(p*sizeof(int));
    newPartitionSizes = (int *) malloc(p*sizeof(int));
    for ( k = 0; k < p; k++) {
        partitionSizes[k] = 0;
    }

    // 获取起始位置和子数组大小
    startIndex = myId * N / p;
    if (p == (myId + 1)) {
        endIndex = N;
    }
    else {
        endIndex = (myId + 1) * N / p;
    }
    subArraySize = endIndex - startIndex;

    MPI_Barrier(MPI_COMM_WORLD);
    //调用各阶段函数
    phase1(array, N, startIndex, subArraySize, pivots, p);
    if (p > 1) {
        phase2(array, startIndex, subArraySize, pivots, partitionSizes, p, myId);
        phase3(array, startIndex, partitionSizes, &newPartitions, newPartitionSizes, p);
        phase4(newPartitions, newPartitionSizes, p, myId, array);
    }
//
//    if (myId == 0)
//        for(k = 0; k < N; k++){
//            printf("%d ",array[k]);
//        }
//    printf("\n");
    if (p > 1) {
        free(newPartitions);
    }
    free(partitionSizes);
    free(newPartitionSizes);
    free(pivots);


//    free(array);

}

//
//static void BM_matrix_mul(benchmark::State& state) {
//    while (state.KeepRunning()){
//        psrs_mpi(array, N);
//    }
//}

//BENCHMARK(BM_matrix_mul);

int main(int argc, char *argv[]) {

    array = (int *) malloc(N*sizeof(int));
    int fin = open("testArr_1G_", O_RDONLY | O_CREAT);
    read(fin, array, N *sizeof(int));

    MPI_Init(&argc,&argv);
    clock_t start = clock();
//
//    ::benchmark::Initialize(&argc, argv);
//    if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1;
//    ::benchmark::RunSpecifiedBenchmarks();
    psrs_mpi(array, N);
    clock_t End = clock();
    double duration = (double)(End - start) / CLOCKS_PER_SEC;
    printf("Cost %f seconds\n", duration);
    free(array);
    MPI_Finalize();

    return 0;
}

完整代码Git链接:https://github.com/ZezhongWang/BlogCode

相关标签: 并行计算 MPI