欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MPI实例之中值滤波二

程序员文章站 2022-06-22 12:07:19
...

前言

前段时间写了“MPI实例之中值滤波”,当时测试出来的结果显示并行时间是超过串行时间的,这是因为程序在进程通信上花费太多时间。后来我将点对点通信修改为集合通信,通信时间大大减少。


代码

修改后的代码如下:

#include <gdal_priv.h>  
#include <iostream>
#include "mpi.h"

using namespace std;

void _medianfilter(const unsigned char* image, unsigned char* result, int width, int height);//中值滤波


int main(int argc, char *argv[])
{
    time_t start, stop;//使用time_t函数计时
    start = time(NULL);
    MPI_Status status;
    const char *inPath = "/home/ubuntu/data/GF1_WFV3_E115.7_N28.9_20170729_L1A0002514125.tiff";//原始影像路径
    //const char *inPath = "/home/ubuntu/data/1.bmp";
    //const char *outPath = "/home/ubuntu/data/2.bmp";
    const char *outPath = "/home/ubuntu/data/gdal_out.tiff";//输出路径
    int nImgSizeX, nImgSizeY, bandcount;
    int rank, size;
    int interval;//每个进程分到的影像高度,该程序直接按影像高度分块
    int i, j;
    //开辟内存  
    unsigned char **pImgData;//主节点存储影像数据
    pImgData = NULL;
    unsigned char **data;//各节点存放数据
    data = NULL;
    unsigned char **result;//各节点存储结果数据
    result = NULL;
    unsigned char **final_data;
    final_data = NULL;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);//rank是进程号
    MPI_Comm_size(MPI_COMM_WORLD, &size);//size是执行程序时使用的进程数
    if (!rank) {//主进程(这里是0号进程)
        //打开图像  
        GDALDataset *poDataset;
        GDALAllRegister();
        poDataset = (GDALDataset *)GDALOpen(inPath, GA_ReadOnly);
        if (poDataset == NULL)
        {
            cout << "nothing" << endl;
        }

        nImgSizeX = poDataset->GetRasterXSize();     //获取横向像元个数  
        nImgSizeY = poDataset->GetRasterYSize();     //获取纵向像元个数  
        bandcount = poDataset->GetRasterCount();     //获取波段数

        pImgData = new unsigned char *[bandcount];
        for (i = 0; i < bandcount; i++)
            pImgData[i] = new unsigned char[nImgSizeX * nImgSizeY];//用来存储各波段原始数据

        //影像切割
        if (nImgSizeY%size)
            MPI_Abort(MPI_COMM_WORLD, 1);
        else
            interval = nImgSizeY / size;

        for (i = 1; i <= bandcount; i++)
        {
            GDALRasterBand * pInRasterBand1 = poDataset->GetRasterBand(i);
            CPLErr error;
            error = pInRasterBand1->RasterIO(GF_Read, 0, 0, nImgSizeX, nImgSizeY, pImgData[i - 1], nImgSizeX, nImgSizeY, GDT_Byte, 0, 0);
            if (error == CE_Failure)
            {
                cout << "读取图像数据时出错!" << endl;
                GDALDestroyDriverManager();
            }
        }
        if (poDataset != NULL)
            delete poDataset;
    }
    //进程0广播
    MPI_Bcast(&nImgSizeX, 1, MPI_INT, 0, MPI_COMM_WORLD); 
    MPI_Bcast(&nImgSizeY, 1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Bcast(&bandcount, 1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Bcast(&interval, 1, MPI_INT, 0, MPI_COMM_WORLD);

    //每个进程都必须分配pImgData,否则后面的MPI_Scatter函数会报错
    if(rank){
        pImgData = new unsigned char *[bandcount];
        for (i = 0; i < bandcount; i++)
            pImgData[i] = new unsigned char[nImgSizeX * nImgSizeY];
    }

    //所有进程分配存储数据集的空间
    data = new unsigned char *[bandcount];
    result = new unsigned char *[bandcount];
    for (i = 0; i < bandcount; i++) {
        data[i] = new unsigned char[nImgSizeX * nImgSizeY / size];
        result[i] = new unsigned char[nImgSizeX * nImgSizeY / size];
    }
    final_data = new unsigned char *[bandcount];
    for (i = 0; i < bandcount; i++)
        final_data[i] = new unsigned char[nImgSizeX * nImgSizeY];

    //0进程向所有进程分发数据,用到集合通信函数MPI_Scatter()
    for(i=0;i<bandcount;i++)
        MPI_Scatter(&pImgData[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, &data[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD);   

    MPI_Barrier(MPI_COMM_WORLD);  
    for (i = 0; i<bandcount; i++)
        _medianfilter(data[i], result[i], nImgSizeX, interval);

    MPI_Barrier(MPI_COMM_WORLD);  //同步一下
    //聚焦,将所有进程数据(包括根进程自己)按顺序发送给根进程
    for (i = 0; i<bandcount; i++)
        MPI_Gather(result[i], nImgSizeX * nImgSizeY / size, MPI_UNSIGNED_CHAR, final_data[i], nImgSizeX * nImgSizeY / size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD);  //全收集于进程0

    MPI_Barrier(MPI_COMM_WORLD);  
    //主进程写入结果
    if (!rank)
    {
        GDALDataset *poDstDS;
        const char  *pszFormat = "GTiff";
        //const char  *pszFormat = "BMP";
        GDALDriver  *poDriver;
        poDriver = GetGDALDriverManager()->GetDriverByName(pszFormat); //获取驱动  
        if (poDriver == NULL)
            exit(1);

        //创建bandcount个波段的图像  
        poDstDS = poDriver->Create(outPath, nImgSizeX, nImgSizeY, bandcount, GDT_Byte, NULL);

        //将缓存pafScan中的数据存入结果图像  
        for (i = 1; i <= bandcount; i++)
        {
            poDstDS->GetRasterBand(i)->RasterIO(GF_Write, 0, 0, nImgSizeX, nImgSizeY,
                final_data[i - 1], nImgSizeX, nImgSizeY, GDT_Byte, 0, 0);
        }

        if (poDstDS != NULL)
            delete poDstDS;
    }
    MPI_Finalize();
    stop = time(NULL);
    printf("Use Time:%ld\n", (stop - start));
    return 0;
}

void _medianfilter(const unsigned char* image, unsigned char* result, int width, int height) //中值滤波
{
    //   Move window through all elements of the image
    for (int m = 1; m < height - 1; ++m)
        for (int n = 1; n < width - 1; ++n)
        {
            //   Pick up window elements
            int k = 0;
            unsigned char window[9];
            for (int j = m - 1; j < m + 2; ++j)
                for (int i = n - 1; i < n + 2; ++i)
                    window[k++] = image[j * width + i];
            //   Order elements (only half of them)
            for (int j = 0; j < 5; ++j)
            {
                //   Find position of minimum element
                int min = j;
                for (int l = j + 1; l < 9; ++l)
                    if (window[l] < window[min])
                        min = l;
                //   Put found minimum element in its place
                const unsigned char temp = window[j];
                window[j] = window[min];
                window[min] = temp; 
            }
            //   Get result - the middle element
            result[m * width + n] = window[4];
        }
}

结果

以下是我测试的结果,具体过程参见我上篇博客
MPI实例之中值滤波二
从上面这幅图可以看出,程序执行8个进程及以下的时候加速比几乎是线性的,达到了我们理想的效果。进程数进一步增加时,通信时间所占比重增多,因此用20个进程跑时时间增加。


总结

1.和上篇博客相比,程序将点对点通信修改为集合通信。

if (!rank) {  //进程0向其他进程分配数据集
        for (i = 0; i<bandcount; i++)
        {
            for (j = nImgSizeX*nImgSizeY/size; j<nImgSizeX * nImgSizeY; j++)
            {
                MPI_Send(&pImgData[i][j], 1, MPI_UNSIGNED_CHAR, j/(nImgSizeX*nImgSizeY/size), 99, MPI_COMM_WORLD);
            }
        }
    }
    else {  //其他进程接收进程0数据
        for (i = 0; i < bandcount; i++) {
            for (j = 0; j < nImgSizeX*nImgSizeY / size; j++) {
                MPI_Recv(&data[i][j], 1, MPI_UNSIGNED_CHAR, 0, 99, MPI_COMM_WORLD, &status);
            }
        }
    }

修改为:

//0进程向所有进程分发数据,用到集合通信函数MPI_Scatter()
    for(i=0;i<bandcount;i++)
        MPI_Scatter(&pImgData[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, &data[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD);

2.使用集合通信可以大大减少通信时间,本实例中使用的集合通信函数主要是MPI_Scatter()和MPI_Gather(),这两个函数中每个进程传递的数据量是相同的。如果数据不采取均分的方式,可以使用MPI_Scatterv()和MPI_Gatherv()等函数,这些函数参数中包括一个数据量数组,可以指定每个进程分配或传输的数据量,具体函数内容可参考官网
.。
3.如果我以后还学习MPI,会继续记录相关知识。有问题的可以联系我,邮箱如下:aaa@qq.com