欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

生产者与消费者模型-有界缓冲区

程序员文章站 2022-04-15 21:57:36
问题描述 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。 该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生 ......

 问题描述

   生产者消费者问题(英语:producer-consumer problem),也称有限缓冲问题(英语:bounded-buffer problem),是一个多进程同步问题的经典案例。  该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据 放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

 

 代码要求

 

  • 三个线程,两个缓冲区 第一个线程往缓冲区a中put,第二个线程从缓冲区a中get,然后put到缓冲区b中;第三个线程从缓冲区b中get。第一个线程相当于纯生产者,第三个线程相当于纯消费者,第二个线程相当于既是生产者又是消费者。
  • 用c++类封装信号量相关的api函数,实现一个名为semaphore的类,提供两个成员 函数:p()和v(); 

    semaphore s(8);

    s.p();

    s.v();

  • 用两个锁分别保护head和tail

     

 实现临界区互斥访问的方法之一  信号量法

 

  概念上信号量是表示无力资源数量的实体,它是一个与队列有关的整型变量,实现上,信号量是一种记录型数据结构,有两个分量,一个是信号量的值,一个是等待该信号量的进程队列的头指针。

 

 实验代码

 

  1 #include <windows.h>
  2 #include <iostream>
  3 
  4 
  5 using namespace std;
  6 
  7 
  8 class semaphore {
  9 private:
 10     handle ssemaphore;
 11 public:
 12     semaphore(int m, int n) {        
 13         ssemaphore = createsemaphore(null, m, n, null);         //创建信号量
 14     }
 15     ~semaphore() {                                               //销毁信号量
 16         closehandle(ssemaphore);
 17     }
 18     void p() {                                                     //p操作 
 19         waitforsingleobject(ssemaphore, infinite);
 20     }
 21         
 22     void v() {                                                    //v操作 
 23         releasesemaphore(ssemaphore, 1, null);
 24     }
 25 
 26 };
 27 
 28 
 29 class buffer {
 30     static const int size = 100;
 31     private:
 32         int cells[size];
 33         int tail;
 34         int head;
 35         int num;
 36 
 37         semaphore semaphore_full_cell;        //空格子
 38         semaphore semaphore_empty_cell;        //满格子
 39         semaphore mutex;                    
 40         semaphore mutex_tail;//保护尾部信号量
 41     public:
 42         buffer()
 43         : num(0), head(0), tail(0), semaphore_full_cell(0, size),
 44             semaphore_empty_cell(size, size), mutex(1, 1), mutex_tail(1, 1){}
 45         ~buffer(){}
 46 
 47         bool put(int x)
 48         {
 49             semaphore_empty_cell.p();
 50             mutex_tail.p();
 51             if (num == size) {
 52             return false;
 53         }
 54             cells[tail] = x;
 55             tail = (tail + 1) % size;
 56             num++;
 57             mutex_tail.v();
 58             semaphore_full_cell.v();
 59             return true;
 60         }
 61 
 62         bool get(int& x)
 63         {
 64             semaphore_full_cell.p();
 65             mutex.p();
 66             if (num == 0) {
 67                 return false;
 68             }
 69             x = cells[head];
 70             head = (head + 1) % size;
 71             num--;
 72             mutex.v();
 73             semaphore_empty_cell.v();
 74             return true;
 75         }
 76 };
 77 
 78 
 79 buffer a;        //a缓存区
 80 buffer b;        //b缓存区
 81 
 82 
 83 dword winapi producer(lpvoid)            //生产者线程
 84 {
 85     for (int i = 0; i < 200; i++) {
 86         bool ok = a.put(i);
 87         if (!ok) {
 88             cout << getcurrentthreadid() << " put: " << i << endl;
 89         }
 90     }
 91     return 0;
 92 }
 93 
 94 
 95 dword winapi consumer(lpvoid)            //消费者线程
 96 {
 97     for (int i = 0; i < 200; i++) {
 98         int x;
 99         bool ok = b.get(x);
100         if (!ok) {
101             cout << getcurrentthreadid() << " get: " << endl;
102         }
103     }
104     return 0;
105 }
106 
107 
108 dword winapi midder(lpvoid)                //即是生产者也是消费者线程
109 {
110     for (int i = 0; i < 200; i++) {
111         int x;
112         bool ok1 = a.get(x);
113         if (!ok1) {
114             cout << getcurrentthreadid() << " get: " << endl;
115         }
116         bool ok = b.put(i);
117         if (!ok) {
118             cout << getcurrentthreadid() << " put: " << i << endl;
119         }
120 
121     }
122     return 0;
123 }
124 
125 
126 int main()
127 {
128     handle thread1 = createthread(null, 0, producer, 0, 0, null);
129     handle thread2 = createthread(null, 0, midder, 0, 0, null);
130     handle thread3 = createthread(null, 0, consumer, 0, 0, null);
131 
132     waitforsingleobject(thread1, infinite);
133     waitforsingleobject(thread2, infinite);
134     waitforsingleobject(thread3, infinite);
135 
136     return 0;
137 }