[并行与分布式程序设计] C++使用pthread库编写并行程序示例
程序员文章站
2022-06-03 21:21:45
...
问题描述
利用上述公式编写程序, 分别计算不同规模下的PI值, 并切比较时间.
串行版本
#include <iostream>
#include <ctime>
#include <cmath>
using namespace std;
int main() {
clock_t begin, end;
float total;
double pi_approx;
for (long n=2*pow(10,5); n<pow(10,10); n*=10)
{
double sum = 0.0;
double factor = 1.0;
begin = clock();
for (int k=0; k<n; k++, factor=-factor) {
sum += factor/(2*k+1);
}
pi_approx = 4 * sum;
end = clock();
total = (float)(end-begin)*1000 / CLOCKS_PER_SEC;
cout.width(14);
cout.setf(ios::left);
cout.precision(14);
cout<<n<<"pi: "<<pi_approx<<" time:"<<total<<"ms"<<endl;
}
}
200000 pi: 3.1415876535898 time:0.43399998545647ms
2000000 pi: 3.1415921535897 time:3.9019999504089ms
20000000 pi: 3.1415926035898 time:38.122001647949ms
200000000 pi: 3.1415926485894 time:380.98202514648ms
2000000000 pi: 3.1415926585052 time:3657.6279296875ms
直接多线程(不考虑竞争条件)
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
#define THREAD_NUM 4
double sum = 0;
double factor;
int flag = 0;
struct threadParam
{
int thread_id; // the number of thread
long n; // the scale of calculation
};
void* pi_busywaiting(void* parm) { //pay attention to the type of the returned value and the parameter
struct threadParam * p = (struct threadParam *)parm; //convert void* to struct threadParam*
int r = p->thread_id;
long n = p->n;
long my_n = n/THREAD_NUM;
long my_first = my_n * r;
long my_last = my_first + my_n;
if (my_first % 2 == 0)
factor = 1.0;
else
factor = -1.0;
// race condition
for (long i=my_first; i<my_last; i++, factor=-factor) {
sum += factor/(2*i+1);
}
pthread_exit(NULL); //exit the thread when the thread is finished
}
int main() {
pthread_t thread[THREAD_NUM];
struct threadParam tp[THREAD_NUM];
clock_t begin, end;
float total;
for (long k=1; k<100000; k*=10)
{
long n = 200000*k;
sum = 0; // init sum before every experience or the sum will be greater and greater
flag = 0; // init flag before every experince or never exit;
begin = clock();
for (int i=0; i<THREAD_NUM; i++) {
tp[i].thread_id = i;
tp[i].n = n; // pack the parameters in a structure which is convenient to pass
pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]); //the forth parameter is to be passed to the funtion whose name is the third parameter
}
for (int i=0; i<THREAD_NUM; i++) {
pthread_join(thread[i], NULL);
}
end = clock();
total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
cout.width(14);
cout.setf(ios::left);
cout.precision(14);
cout<<n<<"pi: "<<4 * sum<<" time:"<<total<<endl;
}
return 0;
}
200000 pi: 3.3333333331573e-06 time:0.95899999141693
2000000 pi: 1.6666666666651e-07 time:5.6389999389648
20000000 pi: 1.6666666666659e-08 time:64.612998962402
200000000 pi: 3.3333333333756e-09 time:610.11999511719
2000000000 pi: 3.1415926515893 time:6361.9799804688
- 多个线程同时更新全局变量sum时, 会出现从内存中取走该变量进行更新的过程中, 该变量已被别的线程更改过, 因而造成结果的无法预测.
- pthread_create() 函数中的参数, 第一个是pthread_t类型的指针, 通常是创建一个
pthread_t threads[n]
的数组, 然后将&threads[i]
作为第一个参数传入 - pthread_create()函数中第四个参数是一个void* 类型的数据, 是传递给线程将要执行的函数的参数, 该函数也是
void*返回值
,void*参数类型
. 通常是将要传递的参数的地址强制转换为void*类型, 然后在执行的函数中转换回原来的类型的指针. 比如一个整型int k=2, 传递参数时(void*)&k, 函数中接收时int* m = (int*) args
, 之后使用*m
即可; 再比如上面的结构体, 每一个线程都分配了一个结构体变量, 转换成指针后传入到第四个参数. 再在线程执行的函数中转换会结构体指针.
忙等待
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
#define THREAD_NUM 4
double sum = 0;
double factor;
int flag = 0;
struct threadParam
{
int thread_id; // the number of thread
long n; // the scale of calculation
};
void* pi_busywaiting(void* parm) { //pay attention to the type of the returned value and the parameter
struct threadParam * p = (struct threadParam *)parm; //convert void* to struct threadParam*
int r = p->thread_id;
long n = p->n;
long my_n = n/THREAD_NUM;
long my_first = my_n * r;
long my_last = my_first + my_n;
double my_sum = 0.0;
if (my_first % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (long i=my_first; i<my_last; i++, factor=-factor) {
my_sum += factor/(2*i+1);
}
// busy waiting
while (flag != r)
sleep(0);
sum += my_sum;
flag++;
pthread_exit(NULL); //exit the thread when the thread is finished
}
int main() {
pthread_t thread[THREAD_NUM];
struct threadParam tp[THREAD_NUM];
clock_t begin, end;
float total;
for (long k=1; k<100000; k*=10)
{
long n = 200000*k;
sum = 0; // init sum before every experience or the sum will be greater and greater
flag = 0; // init flag before every experince or never exit;
begin = clock();
for (int i=0; i<THREAD_NUM; i++) {
tp[i].thread_id = i;
tp[i].n = n; // pack the parameters in a structure which is convenient to pass
pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]); //the forth parameter is to be passed to the funtion whose name is the third parameter
}
for (int i=0; i<THREAD_NUM; i++) {
pthread_join(thread[i], NULL);
}
end = clock();
total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
cout.width(14);
cout.setf(ios::left);
cout.precision(14);
cout<<n<<"pi: "<<4 * sum<<" time:"<<total<<endl;
}
return 0;
}
200000 pi: 3.1415876535898 time:0.77899998426437
2000000 pi: 3.1415921535897 time:4.7909998893738
20000000 pi: 3.1415926035898 time:53.270000457764
200000000 pi: 3.1415926485903 time:550.86700439453
2000000000 pi: 3.1415926530893 time:6261.0771484375
- pthread_join(pthread_t __th, void **__thread_return)
- 第一个参数为被等待的线程标识符,第二个参数为一个用户定义的指针,它可以用来存储被等待线程的返回值
- 这个函数是一个线程阻塞的函数,调用它的线程将一直等待到被等待的线程结束为止
- pthread_exit用于强制退出一个线程(非执行完毕退出),一般用于线程内部。
- 一般都是pthread_exit在线程内退出,然后返回一个值。这个时候就跳到主线程的pthread_join了(因为一直在等你结束),这个返回值会直接送到pthread_join,实现了主与分线程的通信。
互斥量
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
#define THREAD_NUM 4
double sum = 0;
double factor;
pthread_mutex_t amutex;
struct threadParam
{
int thread_id; // the number of thread
long n; // the scale of calculation
};
void* pi_busywaiting(void* parm) { //pay attention to the type of the returned value and the parameter
struct threadParam * p = (struct threadParam *)parm; //convert void* to struct threadParam*
int r = p->thread_id;
long n = p->n;
long my_n = n/THREAD_NUM;
long my_first = my_n * r;
long my_last = my_first + my_n;
double my_sum = 0.0;
if (my_first % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (long i=my_first; i<my_last; i++, factor=-factor) {
my_sum += factor/(2*i+1);
}
// mutex
pthread_mutex_lock(&amutex);
sum += my_sum;
pthread_mutex_unlock(&amutex);
pthread_exit(NULL); //exit the thread when the thread is finished
}
int main() {
pthread_t thread[THREAD_NUM];
struct threadParam tp[THREAD_NUM];
clock_t begin, end;
float total;
for (long k=1; k<100000; k*=10)
{
long n = 200000*k;
sum = 0; // init sum before every experience or the sum will be greater and greater
pthread_mutex_init(&amutex, NULL);
begin = clock();
for (int i=0; i<THREAD_NUM; i++) {
tp[i].thread_id = i;
tp[i].n = n; // pack the parameters in a structure which is convenient to pass
pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]); //the forth parameter is to be passed to the funtion whose name is the third parameter
}
for (int i=0; i<THREAD_NUM; i++) {
pthread_join(thread[i], NULL);
}
pthread_mutex_destroy(&amutex);
end = clock();
total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
cout.width(14);
cout.setf(ios::left);
cout.precision(14);
cout<<n<<"pi: "<<4 * sum<<" time:"<<total<<endl;
}
return 0;
}
200000 pi: 3.1415876535898 time:1.3339999914169
2000000 pi: 3.1415921535897 time:5.3889999389648
20000000 pi: 3.1415926035898 time:64.967002868652
200000000 pi: 3.1415926485903 time:625.01000976562
2000000000 pi: 3.1415926530893 time:6139.1000976562
信号量
- 初始化信号量
- #include <semaphore.h>
- sem_t sem;
- int sem_init(sem_t* sem, int pshared, unsigned value);
- pshared: 非0, 进程间共享; 0, 进程内线程共享
- value: 信号量初始值
- 使用信号量
- int sem_wait(sem_t * sem); //信号值减1, 若原来已为0则阻塞
- int sem_post(sem_t * sem); //信号值加1, 若原来为0, 则可能唤醒阻塞线程
- 释放信号量
- int sem_destroy(sem_t* sem);
- mac os 中
- 初始化信号量
- sem_t* sem;
- sem= sem_open(“semstr”, O_CREAT, 0666, 0);
- 使用信号量
- int sem_wait(sem_t*);
- int sem_post(sem_t*);
- 释放信号量
- sem_unlink(“semstr”);
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <semaphore.h>
using namespace std;
#define THREAD_NUM 4
double sum = 0;
double factor;
sem_t* sem_parent;
sem_t* sem_children;
struct threadParam
{
int thread_id; // the number of thread
long n; // the scale of calculation
};
void* pi_busywaiting(void* parm) { //pay attention to the type of the returned value and the parameter
struct threadParam * p = (struct threadParam *)parm; //convert void* to struct threadParam*
int r = p->thread_id;
long n = p->n;
long my_n = n/THREAD_NUM;
long my_first = my_n * r;
long my_last = my_first + my_n;
double my_sum = 0.0;
if (my_first % 2 == 0)
factor = 1.0;
else
factor = -1.0;
for (long i=my_first; i<my_last; i++, factor=-factor) {
my_sum += factor/(2*i+1);
}
cout<<"I am the thread "<<r<<endl;
sum += my_sum;
sem_post(sem_parent);
sem_wait(sem_children);
cout<<"pi has been calculated by thread "<<r<<endl;
pthread_exit(NULL); //exit the thread when the thread is finished
}
int main() {
pthread_t thread[THREAD_NUM];
struct threadParam tp[THREAD_NUM];
clock_t begin, end;
float total;
for (long k=1; k<100000; k*=10)
{
long n = 200000*k;
sum = 0; // init sum before every experience or the sum will be greater and greater
// there is not "sem_init" and "sem_destroy" in mac os
sem_parent = sem_open("sem_children",O_CREAT, 0666, 0);
sem_children = sem_open("sem_parent", O_CREAT, 0666, 0);
begin = clock();
for (int i=0; i<THREAD_NUM; i++) {
tp[i].thread_id = i;
tp[i].n = n; // pack the parameters in a structure which is convenient to pass
pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]); //the forth parameter is to be passed to the funtion whose name is the third parameter
}
for (int i=0; i<THREAD_NUM; i++) {
sem_wait(sem_parent);
}
cout<<"all the children thread is finished"<<endl;
for (int i=0; i<THREAD_NUM; i++) {
sem_post(sem_children);
}
for (int i=0; i<THREAD_NUM; i++) {
pthread_join(thread[i], NULL);
}
sem_unlink("sem_parent");
sem_unlink("sem_children");
end = clock();
total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
cout.width(14);
cout.setf(ios::left);
cout.precision(14);
cout<<n<<"pi: "<<4 * sum<<" time:"<<total<<endl<<endl;
}
return 0;
}
I am the thread 0
I am the thread I am the thread 1
I am the thread 2
3
all the children thread is finished
pi has been calculated by thread pi has been calculated by thread 0
1
pi has been calculated by thread 2
pi has been calculated by thread 3
200000 pi: 3.1415876535898 time:1.279000043869
I am the thread 0
I am the thread 2
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 2
pi has been calculated by thread 0
pi has been calculated by thread 1
pi has been calculated by thread 3
2000000 pi: 3.1415921535897 time:6.0440001487732
I am the thread 2
I am the thread 0
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 1
pi has been calculated by thread 0
pi has been calculated by thread 3
pi has been calculated by thread 2
20000000 pi: 3.1415926035898 time:65.355003356934
I am the thread 2
I am the thread 0
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 0
pi has been calculated by thread 2
pi has been calculated by thread pi has been calculated by thread 3
1
200000000 pi: 3.1415926485903 time:640.81597900391
I am the thread 3
I am the thread 2
I am the thread 0
I am the thread 1
all the children thread is finished
pi has been calculated by thread 0
pi has been calculated by thread 1
pi has been calculated by thread 2
pi has been calculated by thread 3
2000000000 pi: 3.1415926530893 time:6508.8139648438
barrier
- 初始化barrier
- pthread_barrier_t b;
- pthread_barrier_init(&b, NULL, 3);
- 第二个参数指出对象属性, NULL表示默认属性
- 第三个参数表示等待的线程数, 3表示有三个线程到达后则继续往后执行
- 使用barrier阻挡线程
- pthread_barrier_wait(&b);
上一篇: 编写程序一闪就没了(c与c++)