欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

OpenMP并行编程: 矩阵乘法,积分计算,PSPR排序

程序员文章站 2022-03-15 19:19:32
...

简介

并行计算机可以简单分为共享内存和分布式内存,共享内存就是多个核心共享一个内存,目前的PC就是这类(不管是只有一个多核CPU还是可以插多个CPU,它们都有多个核心和一个内存),一般的大型计算机结合分布式内存和共享内存结构,即每个计算节点内是共享内存,节点间是分布式内存。想要在这些并行计算机上获得较好的性能,进行并行编程是必要条件。目前流行的并行程序设计方法是,分布式内存结构上使用MPI,共享内存结构上使用Pthreads或OpenMP。

和Pthreads相比OpenMP更简单,对于关注算法、只要求对线程之间关系进行最基本控制(同步,互斥等)的我们来说,OpenMP再适合不过了。

基本使用

链接openmp的包在linux上非常简单,通过引入头文件,加入链接库就行了.
头文件

#include <omp.h>

编译时

g++ calcTrap.cpp -o trap -lbenchmark -lpthread -fopenmp

关键语法

编译指导语句:

#pragma omp parallel for
#pragma omp for reduction(+: 变量)
#pragma omp critical//锁
{
}
#pragma omp parallel for private(x,y)//每个线程都独立拷贝x, y变量,互不干扰,如果不设默认是共享变量
#pragma omp parallel for schedule(static/dynamic/guided, k)//总工作量划分成n/k块,再多线程调度
#pragma omp parallel sections
{
    #pragma omp section//要保证几个section下的函数之间没有变量依赖
    .........
    #pragma omp section
    .........        
}
#pragma omp parallel
{
    .......();
    #pragma omp master/single //保证只有主线程/某个线程能访问下面的函数,区别是使用master没有barrier珊障,single的话先完成的线程等待没完成的线程
    {
    }
    .......
}

#pragma omp barrier/nowait //强制设置珊障/无需等待,如果后续函数对前面的多线程没有依赖,即可使用nowait
#pragma omp parallel for firstprivate(变量)/lastprivate(变量) //为每个多线程赋初值/出多线程回到主线程时赋值供主线程使用

库函数:

int omp_get_num_threads(); //获取当前使用的线程个数
int omp_get_num_threads(2/3/...)//设置要使用的线程个数
int omp_get_thread_num(void);//返回当前线程号
int omp_get_num_procs(void);//返回可用的处理核个数
void omp_set_num_threads(thread_num);//设置并行线程数

OpenMP原理

OpenMP由Compiler Directives(编译指导语句)、Run-time Library Functions(库函数)组成,另外还有一些和OpenMP有关的Environment Variables(环境变量)、Data Types(数据类型)以及_OPENMP宏定义。之所以说OpenMP非常简单,是因为,所有这些总共只有50个左右,OpenMP2.0 Specification仅有100余页。第2节的“第一个OpenMP程序”的第6行#pragma omp parallel即Compiler Directive,#pragma omp parallel下面的语句将被多个线程并行执行(也即被执行不止一遍),第8行的omp_get_thread_num()即Run-time Library Function,omp_get_thread_num()返回当前执行代码所在线程编号。

共享内存计算机上并行程序的基本思路就是使用多线程,从而将可并行负载分配到多个物理计算核心,从而缩短执行时间(同时提高CPU利用率)。在共享内存的并行程序中,标准的并行模式为fork/join式并行,这个基本模型如下图示:

OpenMP并行编程: 矩阵乘法,积分计算,PSPR排序

编程实战

积分计算

double Trap(double a, double b, int n, double(*f)(double), int chunk){
    double integral, h;
    int k;
    h = (b-a)/n;
    integral = (f(a) + f(b))/2.0;

#pragma omp parallel for reduction(+:integral) schedule(dynamic, chunk)
    for (int k = 1; k <= n-1 ; ++k) {
        integral += f(a+k*h);
    }
    integral = integral*h;

    return integral;
}

注意, 在使用OpenMP的时候需要注意在临界区的修改需要进行同步,而同步有三种方法:

  • 通过原子(atomic)操作: 仅适用于++, –, ^等简单操作
#pragma omp atomic
  • 通过critical directive: 适用于任何临界区,不过消耗必原子操作
#pragma omp critical
  • 通过reduction归约语句: 只有“+,/, |, &&”归约操作
#pragma omp parallel for reduction(+:integral)

矩阵乘向量

void Mat_vect_mult(
                    int A[],
                    int x[],
                    int y[],
                    const int &m,
                    const int &n,
                    int chunk
        ){
    int i, j;
#pragma omp parallel for shared(A, x, y, m, n) private(i)
    for (i = 0; i < m; ++i) {
        y[i] = 0;
#pragma omp parallel for schedule(dynamic, chunk)
        for (j = 0; j < n; ++j) {
            y[i] += A[i*n + j]*x[j];
        }
    }
}

PSPR排序

void Merge(int *arr, int left, int mid, int right){
    int len = right - left + 1;
    int *temp = new int[len];
    int lhead = left, rhead = mid+1;
    for (int i = 0; i < len; ++i) {
        if (lhead == mid+1){
            for (int j = rhead; j <= right; ++j) {
                temp[i++] = arr[j];
            }
            break;
        }
        else if (rhead == right+1){
            for (int j = lhead; j <= mid; ++j) {
                temp[i++] = arr[j];
            }
            break;
        }

        if (arr[lhead] > arr[rhead]){
            temp[i] = arr[rhead];
            ++rhead;
        }
        else{
            temp[i] = arr[lhead];
            ++lhead;
        }
    }
    for (int i = 0; i < len; ++i) {
        arr[left + i] = temp[i];
    }
    delete temp;
}

/*
 * 默认自小到大
 */
void MergeSort(int *arr, int left, int right){
    if (left == right ) return;

    int mid = (left + right)/2;

    MergeSort(arr, left, mid);
    MergeSort(arr, mid+1, right);
    Merge(arr, left, mid, right);
}

void PSRS(int *arr, int length, int thread_num){
    if(thread_num == 1){
        MergeSort(arr, 0, length-1);
        return ;
    }
    int id, sample_index = 0;
    int base = length / thread_num; // 每段个数
    int *regular_sample = new int[thread_num*thread_num];
    int *pivot = new int[thread_num-1];  // 主元
//    int **pos = new int[thread_num][thread_num-1];  // 每个线程主元划分之后的划分点
//    int (*pos)[10] = new int[thread_num][10];  // 每个线程主元划分之后的划分点,为第一个大于的值
    int **pos = new int* [thread_num];
    for (int i = 0; i < thread_num; ++i) {
        pos[i] = new int[thread_num-1];
    }
    int *temp_arr = new int[length]; // 全局交换中间
    int *chunk_size = new int[thread_num];
    omp_set_num_threads(thread_num);

#pragma omp parallel shared(arr, length, thread_num, base, regular_sample) private(id)
    {
        // 局部排序
        id = omp_get_thread_num();
        assert(id != thread_num);
        if (id == thread_num - 1) {
            MergeSort(arr, id * base, length-1);
        }
        else MergeSort(arr, id * base, (id + 1) * base - 1);

        // 正则采样
#pragma omp critical
        for (int j = 0; j < thread_num; ++j)
            regular_sample[sample_index++] = arr[j * base / thread_num + id * base];

// 同步所有线程
#pragma omp barrier
        assert(sample_index == thread_num * thread_num);
#pragma omp master
        {
            printArr(arr, length, "Local Sorted result:");
            MergeSort(regular_sample, 0, sample_index - 1);
            printArr(regular_sample, thread_num * thread_num, "Regular Sampling after sort:");
            // 选出主元
            for (int j = 0; j < thread_num - 1; ++j) {
                pivot[j] = regular_sample[(j + 1) * thread_num];
            }
            printArr(pivot, thread_num-1, "Select Pivot result:");
        }
#pragma omp barrier
        // 主元划分
        int left = id * base, right = (id + 1) * base - 1, pivot_index = 0;
        if (id == thread_num - 1) right = length - 1;

        for (int j = left; j <= right; ++j) {
            if (pivot_index == thread_num - 1) break;

            if (arr[j] > pivot[pivot_index]) {
                pos[id][pivot_index++] = j;
                continue;
            }
            while (j == right && pivot_index != thread_num - 1)
                pos[id][pivot_index++] = j;
        }

    }
    for (int k = 0; k < thread_num; ++k) {
        DEBUG("Thread id = %d: Pivot divide point", k);
        printArr(pos[k], thread_num-1, "");
    }

    // 全局交换
    assert(thread_num >= 2);
    int start_index = 0, cpleft, cprigth;
    // 复制每组数据左端
    for (int i = 0; i < thread_num; ++i) {
        cpleft = i*base;
        cprigth = pos[i][0] - 1;
        memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
        start_index += cprigth - cpleft + 1;
    }
    chunk_size[0] = start_index;
    printArr(temp_arr, length, "Copy left end");
    // 每组数据中间部分
    // 遍历 1-thread_num-1 的pos
    for (int i = 1; i < thread_num-1; ++i) {
        // 遍历 0 - thread_num 组数据
        for (int j = 0; j < thread_num; ++j) {
            cpleft = pos[j][i-1];
            cprigth = pos[j][i] - 1;
            memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
            start_index += cprigth - cpleft + 1;
        }
        chunk_size[i] = start_index;
    }
    printArr(temp_arr, length, "Copy mid");
    // 复制每组数据右端
    for (int i = 0; i < thread_num; ++i) {
        cpleft = pos[i][thread_num-2];
        cprigth = (i+1)*base -1;
        memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
        start_index += cprigth - cpleft + 1;
    }
    chunk_size[thread_num-1] = start_index;
    printArr(temp_arr, length, "Copy right end");
    assert(start_index == length);

#pragma omp parallel shared(temp_arr, chunk_size) private(id)
    {
        id = omp_get_thread_num();
        assert(id != thread_num);
        if (id == 0)    MergeSort(temp_arr, 0, chunk_size[0]-1);
        else MergeSort(temp_arr, chunk_size[id-1], chunk_size[id]-1);
    }
#pragma omp barrier
    memcpy(arr, temp_arr, sizeof(int)*length);
    delete []temp_arr;
    delete []regular_sample;
    delete []pivot;
    delete []chunk_size;
    for (int i = 0; i < thread_num; ++i) {
        delete []pos[i];
    }
    printArr(arr, length, "*********PSRS Sort Final Result*********");
}

完整代码Git链接:https://github.com/ZezhongWang/BlogCode

相关标签: 并行计算 OpenMP