欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

生产者消费者模型

程序员文章站 2023-12-26 17:06:27
生产者消费者模型 是什么 模型: 就是解决某个问题套路 生产者: 指的是产生数据的一方 (一段代码) 消费者: 指的是处理数据的一方 (一段代码) 生活中到处都是这种模型 例如:饭店 厨师就是生产者 吃饭的人就是消费者 例如: 先爬取网页数据(生产) 在解析网页数据 (消费) 生产者和消费者出啥问题 ......

生产者消费者模型 *****

是什么

模型: 就是解决某个问题套路
生产者: 指的是产生数据的一方 (一段代码)
消费者: 指的是处理数据的一方 (一段代码)

生活中到处都是这种模型

例如:饭店 厨师就是生产者 吃饭的人就是消费者

例如: 先爬取网页数据(生产) 在解析网页数据 (消费)

生产者和消费者出啥问题了?

# 消费任务
def eat(food):
    for i in range(10):
        # 要消费
        time.sleep(random.randint(0, 2))
        print(food,"吃完了!")

# 生产任务
def make_rose():
    for i in range(10):
        # 再生产
        time.sleep(random.randint(0, 2))
        print("第%s盘青椒肉丝制作完成!" % i)
        rose = "第%s盘青椒肉丝" % i
                eat(rose) # 直接调用消费任务

# 开启任务 
make_rose()

生产者和消费,处理速度不平衡,一方快一方慢,导致一方需要等待另一方 整体效率低下

生产者消费者模型解决这个问题的思路:

原本,双方是耦合 在一起,消费必须等待生产者生成完毕在开始处理, 反过来

如果消费消费速度太慢,生产者必须等待其处理完毕才能开始生成下一个数据

解决的方案:

1.将双方分开来.一方专门负责生成,一方专门负责处理

这样一来数据就不能直接传递了 因为消费者可能还没有处理完成,为了使生产者可以不断的生成,则需要一个共同的容器

2.生产者完成后放入容器,消费者从容器中取出数据

这样就解决了双方能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方

案例:

def eat(q):
    for i in range(10):
        # 要消费
        rose = q.get()
        time.sleep(random.randint(0, 2))
        print(rose,"吃完了!")

# 生产任务
def make_rose(q):
    for i in range(10):
        # 再生产
        time.sleep(random.randint(0, 2))
        print("第%s盘青椒肉丝制作完成!" % i)
        rose = "第%s盘青椒肉丝" % i
        # 将生成完成的数据放入队列中
        q.put(rose)

if __name__ == '__main__':
    # 创建一个共享队列
    q = queue()
    make_p = process(target=make_rose,args=(q,))
    eat_p =  process(target=eat,args=(q,))


    make_p.start()
    eat_p.start()

joinablequeue

可翻译:为可join的队列

该队列相比普通的queue的区别在于该对列额外增加的了join函数

join函数的作用:

​ 该函数为阻塞函数,会阻塞直到等待队列中所有数据都被处理完毕。

q = joinablequeue()
q.put(1) 
q.get()
q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")

执行以上函数,将导致进程无法结束,注释掉join调用就正常,发现join的确有阻塞的效果,

但是队列中一共就一个数据,明明已经调用get取出了,为什么join依然阻塞呢?

这是因为get仅仅是取出数据,而join是等待数据处理完毕,也就是说:

取出数据还不算完,你处理完以后必须告知队列处理完毕,通过task_done

q = joinablequeue()
q.put(1) 

q.get()
q.task_done() # 数据处理完毕

q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")
#输出:
#   over

需要注意的时,task_done的调用次数必大于等于队列中的数据个数,join才能正常结束阻塞

q = joinablequeue()
q.put(1) 
q.put(1) 

q.get()
q.task_done() # 数据处理完毕


q.join() #阻塞 等待队列中所有数据都被处理完毕
print("over")
#输出:
#   over

总结:

主进程可以明确知道队列中的数据何时被处理完毕

守护进程与joinablequeue的应用

回顾之前的生产者消费者模型中,生产者与消费者都明确要处理的数据数量,但是实际开发中很多情况是无法提前明确的,例如:要爬去一个网站上的所有页面,页面数量数不固定的

from multiprocessing import process,joinablequeue,queue
import  time,random
def producter(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))
        print("\033[46m%s生产了 热狗%s\033[0m" % (name,i))
        q.put("%s的 热狗%s" % (name,i))


def customer(name,q):
    while true:
        time.sleep(random.randint(1, 2))
        hot_dog = q.get()
        print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog))

if __name__ == '__main__':

    q = queue()

    p1 = process(target=producter,args=("北京

                    
                

上一篇:

下一篇: