生产者消费者模型
生产者消费者模型 *****
是什么
模型: 就是解决某个问题套路
生产者: 指的是产生数据的一方 (一段代码)
消费者: 指的是处理数据的一方 (一段代码)
生活中到处都是这种模型
例如:饭店 厨师就是生产者 吃饭的人就是消费者
例如: 先爬取网页数据(生产) 在解析网页数据 (消费)
生产者和消费者出啥问题了?
# 消费任务 def eat(food): for i in range(10): # 要消费 time.sleep(random.randint(0, 2)) print(food,"吃完了!") # 生产任务 def make_rose(): for i in range(10): # 再生产 time.sleep(random.randint(0, 2)) print("第%s盘青椒肉丝制作完成!" % i) rose = "第%s盘青椒肉丝" % i eat(rose) # 直接调用消费任务 # 开启任务 make_rose()
生产者和消费,处理速度不平衡,一方快一方慢,导致一方需要等待另一方 整体效率低下
生产者消费者模型解决这个问题的思路:
原本,双方是耦合 在一起,消费必须等待生产者生成完毕在开始处理, 反过来
如果消费消费速度太慢,生产者必须等待其处理完毕才能开始生成下一个数据
解决的方案:
1.将双方分开来.一方专门负责生成,一方专门负责处理
这样一来数据就不能直接传递了 因为消费者可能还没有处理完成,为了使生产者可以不断的生成,则需要一个共同的容器
2.生产者完成后放入容器,消费者从容器中取出数据
这样就解决了双方能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方
案例:
def eat(q): for i in range(10): # 要消费 rose = q.get() time.sleep(random.randint(0, 2)) print(rose,"吃完了!") # 生产任务 def make_rose(q): for i in range(10): # 再生产 time.sleep(random.randint(0, 2)) print("第%s盘青椒肉丝制作完成!" % i) rose = "第%s盘青椒肉丝" % i # 将生成完成的数据放入队列中 q.put(rose) if __name__ == '__main__': # 创建一个共享队列 q = queue() make_p = process(target=make_rose,args=(q,)) eat_p = process(target=eat,args=(q,)) make_p.start() eat_p.start()
joinablequeue
可翻译:为可join的队列
该队列相比普通的queue的区别在于该对列额外增加的了join函数
join函数的作用:
该函数为阻塞函数,会阻塞直到等待队列中所有数据都被处理完毕。
q = joinablequeue() q.put(1) q.get() q.join() #阻塞 等待队列中所有数据都被处理完毕 print("over")
执行以上函数,将导致进程无法结束,注释掉join调用就正常,发现join的确有阻塞的效果,
但是队列中一共就一个数据,明明已经调用get取出了,为什么join依然阻塞呢?
这是因为get仅仅是取出数据,而join是等待数据处理完毕,也就是说:
取出数据还不算完,你处理完以后必须告知队列处理完毕,通过task_done
q = joinablequeue() q.put(1) q.get() q.task_done() # 数据处理完毕 q.join() #阻塞 等待队列中所有数据都被处理完毕 print("over") #输出: # over
需要注意的时,task_done的调用次数必大于等于队列中的数据个数,join才能正常结束阻塞
q = joinablequeue() q.put(1) q.put(1) q.get() q.task_done() # 数据处理完毕 q.join() #阻塞 等待队列中所有数据都被处理完毕 print("over") #输出: # over
总结:
主进程可以明确知道队列中的数据何时被处理完毕
守护进程与joinablequeue的应用
回顾之前的生产者消费者模型中,生产者与消费者都明确要处理的数据数量,但是实际开发中很多情况是无法提前明确的,例如:要爬去一个网站上的所有页面,页面数量数不固定的
from multiprocessing import process,joinablequeue,queue import time,random def producter(name,q): for i in range(5): time.sleep(random.randint(1,2)) print("\033[46m%s生产了 热狗%s\033[0m" % (name,i)) q.put("%s的 热狗%s" % (name,i)) def customer(name,q): while true: time.sleep(random.randint(1, 2)) hot_dog = q.get() print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog)) if __name__ == '__main__': q = queue() p1 = process(target=producter,args=("北京