欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[并行与分布式程序设计] C++使用pthread库编写并行程序示例

程序员文章站 2022-06-03 21:21:45
...

问题描述

[并行与分布式程序设计] C++使用pthread库编写并行程序示例
利用上述公式编写程序, 分别计算不同规模下的PI值, 并切比较时间.

串行版本

#include <iostream>
#include <ctime>
#include <cmath>
using namespace std;


int main() {
    clock_t begin, end;
    float total;
    double pi_approx;
    for (long n=2*pow(10,5); n<pow(10,10); n*=10)
    {
        double sum = 0.0;
        double factor = 1.0;
        begin = clock();
        for (int k=0; k<n; k++, factor=-factor) {
            sum += factor/(2*k+1);
        }
        pi_approx = 4 * sum;
        end = clock();
        total = (float)(end-begin)*1000 / CLOCKS_PER_SEC;
        cout.width(14);
        cout.setf(ios::left);
        cout.precision(14);
        cout<<n<<"pi: "<<pi_approx<<"     time:"<<total<<"ms"<<endl;
    }
}
200000        pi: 3.1415876535898     time:0.43399998545647ms
2000000       pi: 3.1415921535897     time:3.9019999504089ms
20000000      pi: 3.1415926035898     time:38.122001647949ms
200000000     pi: 3.1415926485894     time:380.98202514648ms
2000000000    pi: 3.1415926585052     time:3657.6279296875ms

直接多线程(不考虑竞争条件)

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;

#define THREAD_NUM 4

double sum = 0;
double factor;
int flag = 0;

struct threadParam
{
    int thread_id; // the number of thread
    long n;        // the scale of calculation
};

void* pi_busywaiting(void* parm) {      //pay attention to the type of the returned value and the parameter
    struct threadParam * p = (struct threadParam *)parm;  //convert void* to struct threadParam*
    int r = p->thread_id;
    long n = p->n;
    long my_n = n/THREAD_NUM;
    long my_first = my_n * r;
    long my_last = my_first + my_n;
    if (my_first % 2 == 0)
        factor = 1.0;
    else
        factor = -1.0;
    // race condition
    for (long i=my_first; i<my_last; i++, factor=-factor) {
        sum += factor/(2*i+1);
    }
    pthread_exit(NULL);     //exit the thread when the thread is finished
}

int main() {
    pthread_t thread[THREAD_NUM];
    struct threadParam tp[THREAD_NUM];
    clock_t begin, end;
    float total;
    for (long k=1; k<100000; k*=10)
    {
        long n = 200000*k;
        sum = 0;                // init sum before every experience or the sum will be greater and greater
        flag = 0;               // init flag before every experince or never exit;
        begin = clock();
        for (int i=0; i<THREAD_NUM; i++) {
            tp[i].thread_id = i;
            tp[i].n = n;            // pack the parameters in a structure which is convenient to pass
            pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]);    //the forth parameter is to be passed to the funtion whose name is the third parameter
        }
        for (int i=0; i<THREAD_NUM; i++) {
            pthread_join(thread[i], NULL);
        }
        end = clock();

        total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
        cout.width(14);
        cout.setf(ios::left);
        cout.precision(14);
        cout<<n<<"pi: "<<4 * sum<<"     time:"<<total<<endl;
    }
    return 0;
}
200000        pi: 3.3333333331573e-06     time:0.95899999141693
2000000       pi: 1.6666666666651e-07     time:5.6389999389648
20000000      pi: 1.6666666666659e-08     time:64.612998962402
200000000     pi: 3.3333333333756e-09     time:610.11999511719
2000000000    pi: 3.1415926515893     time:6361.9799804688
  • 多个线程同时更新全局变量sum时, 会出现从内存中取走该变量进行更新的过程中, 该变量已被别的线程更改过, 因而造成结果的无法预测.
  • pthread_create() 函数中的参数, 第一个是pthread_t类型的指针, 通常是创建一个pthread_t threads[n]的数组, 然后将&threads[i]作为第一个参数传入
  • pthread_create()函数中第四个参数是一个void* 类型的数据, 是传递给线程将要执行的函数的参数, 该函数也是void*返回值, void*参数类型. 通常是将要传递的参数的地址强制转换为void*类型, 然后在执行的函数中转换回原来的类型的指针. 比如一个整型int k=2, 传递参数时(void*)&k, 函数中接收时int* m = (int*) args, 之后使用*m即可; 再比如上面的结构体, 每一个线程都分配了一个结构体变量, 转换成指针后传入到第四个参数. 再在线程执行的函数中转换会结构体指针.

忙等待

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;

#define THREAD_NUM 4

double sum = 0;
double factor;
int flag = 0;

struct threadParam
{
    int thread_id; // the number of thread
    long n;        // the scale of calculation
};

void* pi_busywaiting(void* parm) {      //pay attention to the type of the returned value and the parameter
    struct threadParam * p = (struct threadParam *)parm;  //convert void* to struct threadParam*
    int r = p->thread_id;
    long n = p->n;
    long my_n = n/THREAD_NUM;
    long my_first = my_n * r;
    long my_last = my_first + my_n;
    double my_sum = 0.0;
    if (my_first % 2 == 0)
        factor = 1.0;
    else
        factor = -1.0;
    for (long i=my_first; i<my_last; i++, factor=-factor) {
        my_sum += factor/(2*i+1);
    }
    // busy waiting
    while (flag != r)
        sleep(0);
    sum += my_sum;
    flag++;
    pthread_exit(NULL);     //exit the thread when the thread is finished
}

int main() {
    pthread_t thread[THREAD_NUM];
    struct threadParam tp[THREAD_NUM];
    clock_t begin, end;
    float total;
    for (long k=1; k<100000; k*=10)
    {
        long n = 200000*k;
        sum = 0;                // init sum before every experience or the sum will be greater and greater
        flag = 0;               // init flag before every experince or never exit;
        begin = clock();
        for (int i=0; i<THREAD_NUM; i++) {
            tp[i].thread_id = i;
            tp[i].n = n;            // pack the parameters in a structure which is convenient to pass
            pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]);    //the forth parameter is to be passed to the funtion whose name is the third parameter
        }
        for (int i=0; i<THREAD_NUM; i++) {
            pthread_join(thread[i], NULL);
        }
        end = clock();

        total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
        cout.width(14);
        cout.setf(ios::left);
        cout.precision(14);
        cout<<n<<"pi: "<<4 * sum<<"     time:"<<total<<endl;
    }
    return 0;
}
200000        pi: 3.1415876535898     time:0.77899998426437
2000000       pi: 3.1415921535897     time:4.7909998893738
20000000      pi: 3.1415926035898     time:53.270000457764
200000000     pi: 3.1415926485903     time:550.86700439453
2000000000    pi: 3.1415926530893     time:6261.0771484375
  • pthread_join(pthread_t __th, void **__thread_return)
    • 第一个参数为被等待的线程标识符,第二个参数为一个用户定义的指针,它可以用来存储被等待线程的返回值
    • 这个函数是一个线程阻塞的函数,调用它的线程将一直等待到被等待的线程结束为止
  • pthread_exit用于强制退出一个线程(非执行完毕退出),一般用于线程内部。
  • 一般都是pthread_exit在线程内退出,然后返回一个值。这个时候就跳到主线程的pthread_join了(因为一直在等你结束),这个返回值会直接送到pthread_join,实现了主与分线程的通信。

互斥量

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;

#define THREAD_NUM 4

double sum = 0;
double factor;
pthread_mutex_t amutex;

struct threadParam
{
    int thread_id; // the number of thread
    long n;        // the scale of calculation
};

void* pi_busywaiting(void* parm) {      //pay attention to the type of the returned value and the parameter
    struct threadParam * p = (struct threadParam *)parm;  //convert void* to struct threadParam*
    int r = p->thread_id;
    long n = p->n;
    long my_n = n/THREAD_NUM;
    long my_first = my_n * r;
    long my_last = my_first + my_n;
    double my_sum = 0.0;
    if (my_first % 2 == 0)
        factor = 1.0;
    else
        factor = -1.0;
    for (long i=my_first; i<my_last; i++, factor=-factor) {
        my_sum += factor/(2*i+1);
    }
    
    // mutex
    pthread_mutex_lock(&amutex);
    sum += my_sum;
    pthread_mutex_unlock(&amutex);
    
    pthread_exit(NULL);     //exit the thread when the thread is finished
}

int main() {
    pthread_t thread[THREAD_NUM];
    struct threadParam tp[THREAD_NUM];
    clock_t begin, end;
    float total;
    for (long k=1; k<100000; k*=10)
    {
        long n = 200000*k;
        sum = 0;                // init sum before every experience or the sum will be greater and greater
        pthread_mutex_init(&amutex, NULL);
        begin = clock();
        for (int i=0; i<THREAD_NUM; i++) {
            tp[i].thread_id = i;
            tp[i].n = n;            // pack the parameters in a structure which is convenient to pass
            pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]);    //the forth parameter is to be passed to the funtion whose name is the third parameter
        }
        for (int i=0; i<THREAD_NUM; i++) {
            pthread_join(thread[i], NULL);
        }
        pthread_mutex_destroy(&amutex);
        end = clock();

        total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
        cout.width(14);
        cout.setf(ios::left);
        cout.precision(14);
        cout<<n<<"pi: "<<4 * sum<<"     time:"<<total<<endl;
    }
    return 0;
}
200000        pi: 3.1415876535898     time:1.3339999914169
2000000       pi: 3.1415921535897     time:5.3889999389648
20000000      pi: 3.1415926035898     time:64.967002868652
200000000     pi: 3.1415926485903     time:625.01000976562
2000000000    pi: 3.1415926530893     time:6139.1000976562

信号量

  • 初始化信号量
    • #include <semaphore.h>
    • sem_t sem;
    • int sem_init(sem_t* sem, int pshared, unsigned value);
      • pshared: 非0, 进程间共享; 0, 进程内线程共享
      • value: 信号量初始值
  • 使用信号量
    • int sem_wait(sem_t * sem); //信号值减1, 若原来已为0则阻塞
    • int sem_post(sem_t * sem); //信号值加1, 若原来为0, 则可能唤醒阻塞线程
  • 释放信号量
    • int sem_destroy(sem_t* sem);
  • mac os 中
    • 初始化信号量
      - sem_t* sem;
      - sem= sem_open(“semstr”, O_CREAT, 0666, 0);
      - 使用信号量
      - int sem_wait(sem_t*);
      - int sem_post(sem_t*);
      - 释放信号量
      - sem_unlink(“semstr”);
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <semaphore.h>
using namespace std;

#define THREAD_NUM 4

double sum = 0;
double factor;

sem_t* sem_parent;
sem_t* sem_children;

struct threadParam
{
    int thread_id; // the number of thread
    long n;        // the scale of calculation
};

void* pi_busywaiting(void* parm) {      //pay attention to the type of the returned value and the parameter
    struct threadParam * p = (struct threadParam *)parm;  //convert void* to struct threadParam*
    int r = p->thread_id;
    long n = p->n;
    long my_n = n/THREAD_NUM;
    long my_first = my_n * r;
    long my_last = my_first + my_n;
    double my_sum = 0.0;
    if (my_first % 2 == 0)
        factor = 1.0;
    else
        factor = -1.0;
    for (long i=my_first; i<my_last; i++, factor=-factor) {
        my_sum += factor/(2*i+1);
    }
    
    cout<<"I am the thread "<<r<<endl;
    sum += my_sum;
    
    sem_post(sem_parent);
    sem_wait(sem_children);
    
    cout<<"pi has been calculated by thread "<<r<<endl;
    
    pthread_exit(NULL);     //exit the thread when the thread is finished
}

int main() {
    pthread_t thread[THREAD_NUM];
    struct threadParam tp[THREAD_NUM];
    clock_t begin, end;
    float total;
    for (long k=1; k<100000; k*=10)
    {
        long n = 200000*k;
        sum = 0;                // init sum before every experience or the sum will be greater and greater
        
        // there is not "sem_init" and "sem_destroy" in mac os
        sem_parent = sem_open("sem_children",O_CREAT, 0666, 0);
        sem_children = sem_open("sem_parent", O_CREAT, 0666, 0);
        
        begin = clock();
        for (int i=0; i<THREAD_NUM; i++) {
            tp[i].thread_id = i;
            tp[i].n = n;            // pack the parameters in a structure which is convenient to pass
            pthread_create(&thread[i], NULL, pi_busywaiting, (void*)&tp[i]);    //the forth parameter is to be passed to the funtion whose name is the third parameter
        }
        for (int i=0; i<THREAD_NUM; i++) {
            sem_wait(sem_parent);
        }
        cout<<"all the children thread is finished"<<endl;
        for (int i=0; i<THREAD_NUM; i++) {
            sem_post(sem_children);
        }
        
        for (int i=0; i<THREAD_NUM; i++) {
            pthread_join(thread[i], NULL);
        }
        sem_unlink("sem_parent");
        sem_unlink("sem_children");
        end = clock();

        total = (float) (end-begin)*1000 / CLOCKS_PER_SEC;
        cout.width(14);
        cout.setf(ios::left);
        cout.precision(14);
        cout<<n<<"pi: "<<4 * sum<<"     time:"<<total<<endl<<endl;
    }
    return 0;
}
I am the thread 0
I am the thread I am the thread 1
I am the thread 2
3
all the children thread is finished
pi has been calculated by thread pi has been calculated by thread 0
1
pi has been calculated by thread 2
pi has been calculated by thread 3
200000        pi: 3.1415876535898     time:1.279000043869

I am the thread 0
I am the thread 2
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 2
pi has been calculated by thread 0
pi has been calculated by thread 1
pi has been calculated by thread 3
2000000       pi: 3.1415921535897     time:6.0440001487732

I am the thread 2
I am the thread 0
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 1
pi has been calculated by thread 0
pi has been calculated by thread 3
pi has been calculated by thread 2
20000000      pi: 3.1415926035898     time:65.355003356934

I am the thread 2
I am the thread 0
I am the thread 1
I am the thread 3
all the children thread is finished
pi has been calculated by thread 0
pi has been calculated by thread 2
pi has been calculated by thread pi has been calculated by thread 3
1
200000000     pi: 3.1415926485903     time:640.81597900391

I am the thread 3
I am the thread 2
I am the thread 0
I am the thread 1
all the children thread is finished
pi has been calculated by thread 0
pi has been calculated by thread 1
pi has been calculated by thread 2
pi has been calculated by thread 3
2000000000    pi: 3.1415926530893     time:6508.8139648438

barrier

  • 初始化barrier
    • pthread_barrier_t b;
    • pthread_barrier_init(&b, NULL, 3);
      • 第二个参数指出对象属性, NULL表示默认属性
      • 第三个参数表示等待的线程数, 3表示有三个线程到达后则继续往后执行
  • 使用barrier阻挡线程
    • pthread_barrier_wait(&b);