OpenMP并行编程: 矩阵乘法,积分计算,PSPR排序
简介
并行计算机可以简单分为共享内存和分布式内存,共享内存就是多个核心共享一个内存,目前的PC就是这类(不管是只有一个多核CPU还是可以插多个CPU,它们都有多个核心和一个内存),一般的大型计算机结合分布式内存和共享内存结构,即每个计算节点内是共享内存,节点间是分布式内存。想要在这些并行计算机上获得较好的性能,进行并行编程是必要条件。目前流行的并行程序设计方法是,分布式内存结构上使用MPI,共享内存结构上使用Pthreads或OpenMP。
和Pthreads相比OpenMP更简单,对于关注算法、只要求对线程之间关系进行最基本控制(同步,互斥等)的我们来说,OpenMP再适合不过了。
基本使用
链接openmp的包在linux上非常简单,通过引入头文件,加入链接库就行了.
头文件
#include <omp.h>
编译时
g++ calcTrap.cpp -o trap -lbenchmark -lpthread -fopenmp
关键语法
编译指导语句:
#pragma omp parallel for
#pragma omp for reduction(+: 变量)
#pragma omp critical//锁
{
}
#pragma omp parallel for private(x,y)//每个线程都独立拷贝x, y变量,互不干扰,如果不设默认是共享变量
#pragma omp parallel for schedule(static/dynamic/guided, k)//总工作量划分成n/k块,再多线程调度
#pragma omp parallel sections
{
#pragma omp section//要保证几个section下的函数之间没有变量依赖
.........
#pragma omp section
.........
}
#pragma omp parallel
{
.......();
#pragma omp master/single //保证只有主线程/某个线程能访问下面的函数,区别是使用master没有barrier珊障,single的话先完成的线程等待没完成的线程
{
}
.......
}
#pragma omp barrier/nowait //强制设置珊障/无需等待,如果后续函数对前面的多线程没有依赖,即可使用nowait
#pragma omp parallel for firstprivate(变量)/lastprivate(变量) //为每个多线程赋初值/出多线程回到主线程时赋值供主线程使用
库函数:
int omp_get_num_threads(); //获取当前使用的线程个数
int omp_get_num_threads(2/3/...)//设置要使用的线程个数
int omp_get_thread_num(void);//返回当前线程号
int omp_get_num_procs(void);//返回可用的处理核个数
void omp_set_num_threads(thread_num);//设置并行线程数
OpenMP原理
OpenMP由Compiler Directives(编译指导语句)、Run-time Library Functions(库函数)组成,另外还有一些和OpenMP有关的Environment Variables(环境变量)、Data Types(数据类型)以及_OPENMP宏定义。之所以说OpenMP非常简单,是因为,所有这些总共只有50个左右,OpenMP2.0 Specification仅有100余页。第2节的“第一个OpenMP程序”的第6行#pragma omp parallel
即Compiler Directive,#pragma omp parallel
下面的语句将被多个线程并行执行(也即被执行不止一遍),第8行的omp_get_thread_num()
即Run-time Library Function,omp_get_thread_num()返回当前执行代码所在线程编号。
共享内存计算机上并行程序的基本思路就是使用多线程,从而将可并行负载分配到多个物理计算核心,从而缩短执行时间(同时提高CPU利用率)。在共享内存的并行程序中,标准的并行模式为fork/join
式并行,这个基本模型如下图示:
编程实战
积分计算
double Trap(double a, double b, int n, double(*f)(double), int chunk){
double integral, h;
int k;
h = (b-a)/n;
integral = (f(a) + f(b))/2.0;
#pragma omp parallel for reduction(+:integral) schedule(dynamic, chunk)
for (int k = 1; k <= n-1 ; ++k) {
integral += f(a+k*h);
}
integral = integral*h;
return integral;
}
注意, 在使用OpenMP的时候需要注意在临界区的修改需要进行同步,而同步有三种方法:
- 通过原子(atomic)操作: 仅适用于++, –, ^等简单操作
#pragma omp atomic
- 通过critical directive: 适用于任何临界区,不过消耗必原子操作
#pragma omp critical
- 通过reduction归约语句: 只有“+,/, |, &&”归约操作
#pragma omp parallel for reduction(+:integral)
矩阵乘向量
void Mat_vect_mult(
int A[],
int x[],
int y[],
const int &m,
const int &n,
int chunk
){
int i, j;
#pragma omp parallel for shared(A, x, y, m, n) private(i)
for (i = 0; i < m; ++i) {
y[i] = 0;
#pragma omp parallel for schedule(dynamic, chunk)
for (j = 0; j < n; ++j) {
y[i] += A[i*n + j]*x[j];
}
}
}
PSPR排序
void Merge(int *arr, int left, int mid, int right){
int len = right - left + 1;
int *temp = new int[len];
int lhead = left, rhead = mid+1;
for (int i = 0; i < len; ++i) {
if (lhead == mid+1){
for (int j = rhead; j <= right; ++j) {
temp[i++] = arr[j];
}
break;
}
else if (rhead == right+1){
for (int j = lhead; j <= mid; ++j) {
temp[i++] = arr[j];
}
break;
}
if (arr[lhead] > arr[rhead]){
temp[i] = arr[rhead];
++rhead;
}
else{
temp[i] = arr[lhead];
++lhead;
}
}
for (int i = 0; i < len; ++i) {
arr[left + i] = temp[i];
}
delete temp;
}
/*
* 默认自小到大
*/
void MergeSort(int *arr, int left, int right){
if (left == right ) return;
int mid = (left + right)/2;
MergeSort(arr, left, mid);
MergeSort(arr, mid+1, right);
Merge(arr, left, mid, right);
}
void PSRS(int *arr, int length, int thread_num){
if(thread_num == 1){
MergeSort(arr, 0, length-1);
return ;
}
int id, sample_index = 0;
int base = length / thread_num; // 每段个数
int *regular_sample = new int[thread_num*thread_num];
int *pivot = new int[thread_num-1]; // 主元
// int **pos = new int[thread_num][thread_num-1]; // 每个线程主元划分之后的划分点
// int (*pos)[10] = new int[thread_num][10]; // 每个线程主元划分之后的划分点,为第一个大于的值
int **pos = new int* [thread_num];
for (int i = 0; i < thread_num; ++i) {
pos[i] = new int[thread_num-1];
}
int *temp_arr = new int[length]; // 全局交换中间
int *chunk_size = new int[thread_num];
omp_set_num_threads(thread_num);
#pragma omp parallel shared(arr, length, thread_num, base, regular_sample) private(id)
{
// 局部排序
id = omp_get_thread_num();
assert(id != thread_num);
if (id == thread_num - 1) {
MergeSort(arr, id * base, length-1);
}
else MergeSort(arr, id * base, (id + 1) * base - 1);
// 正则采样
#pragma omp critical
for (int j = 0; j < thread_num; ++j)
regular_sample[sample_index++] = arr[j * base / thread_num + id * base];
// 同步所有线程
#pragma omp barrier
assert(sample_index == thread_num * thread_num);
#pragma omp master
{
printArr(arr, length, "Local Sorted result:");
MergeSort(regular_sample, 0, sample_index - 1);
printArr(regular_sample, thread_num * thread_num, "Regular Sampling after sort:");
// 选出主元
for (int j = 0; j < thread_num - 1; ++j) {
pivot[j] = regular_sample[(j + 1) * thread_num];
}
printArr(pivot, thread_num-1, "Select Pivot result:");
}
#pragma omp barrier
// 主元划分
int left = id * base, right = (id + 1) * base - 1, pivot_index = 0;
if (id == thread_num - 1) right = length - 1;
for (int j = left; j <= right; ++j) {
if (pivot_index == thread_num - 1) break;
if (arr[j] > pivot[pivot_index]) {
pos[id][pivot_index++] = j;
continue;
}
while (j == right && pivot_index != thread_num - 1)
pos[id][pivot_index++] = j;
}
}
for (int k = 0; k < thread_num; ++k) {
DEBUG("Thread id = %d: Pivot divide point", k);
printArr(pos[k], thread_num-1, "");
}
// 全局交换
assert(thread_num >= 2);
int start_index = 0, cpleft, cprigth;
// 复制每组数据左端
for (int i = 0; i < thread_num; ++i) {
cpleft = i*base;
cprigth = pos[i][0] - 1;
memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
start_index += cprigth - cpleft + 1;
}
chunk_size[0] = start_index;
printArr(temp_arr, length, "Copy left end");
// 每组数据中间部分
// 遍历 1-thread_num-1 的pos
for (int i = 1; i < thread_num-1; ++i) {
// 遍历 0 - thread_num 组数据
for (int j = 0; j < thread_num; ++j) {
cpleft = pos[j][i-1];
cprigth = pos[j][i] - 1;
memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
start_index += cprigth - cpleft + 1;
}
chunk_size[i] = start_index;
}
printArr(temp_arr, length, "Copy mid");
// 复制每组数据右端
for (int i = 0; i < thread_num; ++i) {
cpleft = pos[i][thread_num-2];
cprigth = (i+1)*base -1;
memcpy(temp_arr+start_index, arr + cpleft, sizeof(int)*(cprigth - cpleft + 1));
start_index += cprigth - cpleft + 1;
}
chunk_size[thread_num-1] = start_index;
printArr(temp_arr, length, "Copy right end");
assert(start_index == length);
#pragma omp parallel shared(temp_arr, chunk_size) private(id)
{
id = omp_get_thread_num();
assert(id != thread_num);
if (id == 0) MergeSort(temp_arr, 0, chunk_size[0]-1);
else MergeSort(temp_arr, chunk_size[id-1], chunk_size[id]-1);
}
#pragma omp barrier
memcpy(arr, temp_arr, sizeof(int)*length);
delete []temp_arr;
delete []regular_sample;
delete []pivot;
delete []chunk_size;
for (int i = 0; i < thread_num; ++i) {
delete []pos[i];
}
printArr(arr, length, "*********PSRS Sort Final Result*********");
}
完整代码Git链接:https://github.com/ZezhongWang/BlogCode
上一篇: SIMD初学