MPI实例之中值滤波二
程序员文章站
2022-06-22 12:07:19
...
前言
前段时间写了“MPI实例之中值滤波”,当时测试出来的结果显示并行时间是超过串行时间的,这是因为程序在进程通信上花费太多时间。后来我将点对点通信修改为集合通信,通信时间大大减少。
代码
修改后的代码如下:
#include <gdal_priv.h>
#include <iostream>
#include "mpi.h"
using namespace std;
void _medianfilter(const unsigned char* image, unsigned char* result, int width, int height);//中值滤波
int main(int argc, char *argv[])
{
time_t start, stop;//使用time_t函数计时
start = time(NULL);
MPI_Status status;
const char *inPath = "/home/ubuntu/data/GF1_WFV3_E115.7_N28.9_20170729_L1A0002514125.tiff";//原始影像路径
//const char *inPath = "/home/ubuntu/data/1.bmp";
//const char *outPath = "/home/ubuntu/data/2.bmp";
const char *outPath = "/home/ubuntu/data/gdal_out.tiff";//输出路径
int nImgSizeX, nImgSizeY, bandcount;
int rank, size;
int interval;//每个进程分到的影像高度,该程序直接按影像高度分块
int i, j;
//开辟内存
unsigned char **pImgData;//主节点存储影像数据
pImgData = NULL;
unsigned char **data;//各节点存放数据
data = NULL;
unsigned char **result;//各节点存储结果数据
result = NULL;
unsigned char **final_data;
final_data = NULL;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);//rank是进程号
MPI_Comm_size(MPI_COMM_WORLD, &size);//size是执行程序时使用的进程数
if (!rank) {//主进程(这里是0号进程)
//打开图像
GDALDataset *poDataset;
GDALAllRegister();
poDataset = (GDALDataset *)GDALOpen(inPath, GA_ReadOnly);
if (poDataset == NULL)
{
cout << "nothing" << endl;
}
nImgSizeX = poDataset->GetRasterXSize(); //获取横向像元个数
nImgSizeY = poDataset->GetRasterYSize(); //获取纵向像元个数
bandcount = poDataset->GetRasterCount(); //获取波段数
pImgData = new unsigned char *[bandcount];
for (i = 0; i < bandcount; i++)
pImgData[i] = new unsigned char[nImgSizeX * nImgSizeY];//用来存储各波段原始数据
//影像切割
if (nImgSizeY%size)
MPI_Abort(MPI_COMM_WORLD, 1);
else
interval = nImgSizeY / size;
for (i = 1; i <= bandcount; i++)
{
GDALRasterBand * pInRasterBand1 = poDataset->GetRasterBand(i);
CPLErr error;
error = pInRasterBand1->RasterIO(GF_Read, 0, 0, nImgSizeX, nImgSizeY, pImgData[i - 1], nImgSizeX, nImgSizeY, GDT_Byte, 0, 0);
if (error == CE_Failure)
{
cout << "读取图像数据时出错!" << endl;
GDALDestroyDriverManager();
}
}
if (poDataset != NULL)
delete poDataset;
}
//进程0广播
MPI_Bcast(&nImgSizeX, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&nImgSizeY, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&bandcount, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&interval, 1, MPI_INT, 0, MPI_COMM_WORLD);
//每个进程都必须分配pImgData,否则后面的MPI_Scatter函数会报错
if(rank){
pImgData = new unsigned char *[bandcount];
for (i = 0; i < bandcount; i++)
pImgData[i] = new unsigned char[nImgSizeX * nImgSizeY];
}
//所有进程分配存储数据集的空间
data = new unsigned char *[bandcount];
result = new unsigned char *[bandcount];
for (i = 0; i < bandcount; i++) {
data[i] = new unsigned char[nImgSizeX * nImgSizeY / size];
result[i] = new unsigned char[nImgSizeX * nImgSizeY / size];
}
final_data = new unsigned char *[bandcount];
for (i = 0; i < bandcount; i++)
final_data[i] = new unsigned char[nImgSizeX * nImgSizeY];
//0进程向所有进程分发数据,用到集合通信函数MPI_Scatter()
for(i=0;i<bandcount;i++)
MPI_Scatter(&pImgData[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, &data[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
for (i = 0; i<bandcount; i++)
_medianfilter(data[i], result[i], nImgSizeX, interval);
MPI_Barrier(MPI_COMM_WORLD); //同步一下
//聚焦,将所有进程数据(包括根进程自己)按顺序发送给根进程
for (i = 0; i<bandcount; i++)
MPI_Gather(result[i], nImgSizeX * nImgSizeY / size, MPI_UNSIGNED_CHAR, final_data[i], nImgSizeX * nImgSizeY / size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD); //全收集于进程0
MPI_Barrier(MPI_COMM_WORLD);
//主进程写入结果
if (!rank)
{
GDALDataset *poDstDS;
const char *pszFormat = "GTiff";
//const char *pszFormat = "BMP";
GDALDriver *poDriver;
poDriver = GetGDALDriverManager()->GetDriverByName(pszFormat); //获取驱动
if (poDriver == NULL)
exit(1);
//创建bandcount个波段的图像
poDstDS = poDriver->Create(outPath, nImgSizeX, nImgSizeY, bandcount, GDT_Byte, NULL);
//将缓存pafScan中的数据存入结果图像
for (i = 1; i <= bandcount; i++)
{
poDstDS->GetRasterBand(i)->RasterIO(GF_Write, 0, 0, nImgSizeX, nImgSizeY,
final_data[i - 1], nImgSizeX, nImgSizeY, GDT_Byte, 0, 0);
}
if (poDstDS != NULL)
delete poDstDS;
}
MPI_Finalize();
stop = time(NULL);
printf("Use Time:%ld\n", (stop - start));
return 0;
}
void _medianfilter(const unsigned char* image, unsigned char* result, int width, int height) //中值滤波
{
// Move window through all elements of the image
for (int m = 1; m < height - 1; ++m)
for (int n = 1; n < width - 1; ++n)
{
// Pick up window elements
int k = 0;
unsigned char window[9];
for (int j = m - 1; j < m + 2; ++j)
for (int i = n - 1; i < n + 2; ++i)
window[k++] = image[j * width + i];
// Order elements (only half of them)
for (int j = 0; j < 5; ++j)
{
// Find position of minimum element
int min = j;
for (int l = j + 1; l < 9; ++l)
if (window[l] < window[min])
min = l;
// Put found minimum element in its place
const unsigned char temp = window[j];
window[j] = window[min];
window[min] = temp;
}
// Get result - the middle element
result[m * width + n] = window[4];
}
}
结果
以下是我测试的结果,具体过程参见我上篇博客。
从上面这幅图可以看出,程序执行8个进程及以下的时候加速比几乎是线性的,达到了我们理想的效果。进程数进一步增加时,通信时间所占比重增多,因此用20个进程跑时时间增加。
总结
1.和上篇博客相比,程序将点对点通信修改为集合通信。
if (!rank) { //进程0向其他进程分配数据集
for (i = 0; i<bandcount; i++)
{
for (j = nImgSizeX*nImgSizeY/size; j<nImgSizeX * nImgSizeY; j++)
{
MPI_Send(&pImgData[i][j], 1, MPI_UNSIGNED_CHAR, j/(nImgSizeX*nImgSizeY/size), 99, MPI_COMM_WORLD);
}
}
}
else { //其他进程接收进程0数据
for (i = 0; i < bandcount; i++) {
for (j = 0; j < nImgSizeX*nImgSizeY / size; j++) {
MPI_Recv(&data[i][j], 1, MPI_UNSIGNED_CHAR, 0, 99, MPI_COMM_WORLD, &status);
}
}
}
修改为:
//0进程向所有进程分发数据,用到集合通信函数MPI_Scatter()
for(i=0;i<bandcount;i++)
MPI_Scatter(&pImgData[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, &data[i], nImgSizeX * nImgSizeY/size, MPI_UNSIGNED_CHAR, 0, MPI_COMM_WORLD);
2.使用集合通信可以大大减少通信时间,本实例中使用的集合通信函数主要是MPI_Scatter()和MPI_Gather(),这两个函数中每个进程传递的数据量是相同的。如果数据不采取均分的方式,可以使用MPI_Scatterv()和MPI_Gatherv()等函数,这些函数参数中包括一个数据量数组,可以指定每个进程分配或传输的数据量,具体函数内容可参考官网。
.。
3.如果我以后还学习MPI,会继续记录相关知识。有问题的可以联系我,邮箱如下:aaa@qq.com。