Python全栈之队列详解
程序员文章站
2022-03-02 16:04:01
目录1. lock互斥锁2. 事件_红绿灯效果2.1 信号量_semaphore2.2 事件_红绿灯效果3. queue进程队列4. 生产者消费者模型5. joinablequeue队列使用6. 总结...
1. lock互斥锁
知识点:
lock.acquire()# 上锁 lock.release()# 解锁 #同一时间允许一个进程上一把锁 就是lock 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲速度却保证了数据安全。 #同一时间允许多个进程上多把锁 就是[信号量semaphore] 信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁 # 互斥锁lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性 # 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
程序实现:
# ### 锁 lock 互斥锁 from multiprocessing import process,lock """ 上锁和解锁是一对, 连续上锁不解锁是死锁 ,只有在解锁的状态下,其他进程才有机会上锁 """ """ # 创建一把锁 lock = lock() # 上锁 lock.acquire() # lock.acquire() # 连续上锁,造成了死锁现象; print("我在袅袅炊烟 .. 你在焦急等待 ... 厕所进行时 ... ") # 解锁 lock.release() """ # ### 12306 抢票软件 import json,time,random # 1.读写数据库当中的票数 def wr_info(sign , dic=none): if sign == "r": with open("ticket",mode="r",encoding="utf-8") as fp: dic = json.load(fp) return dic elif sign == "w": with open("ticket",mode="w",encoding="utf-8") as fp: json.dump(dic,fp) # dic = wr_info("w",dic={"count":0}) # print(dic , type(dic) ) # 2.执行抢票的方法 def get_ticket(person): # 先获取数据库中实际票数 dic = wr_info("r") # 模拟一下网络延迟 time.sleep(random.uniform(0.1,0.7)) # 判断票数 if dic["count"] > 0: print("{}抢到票了".format(person)) # 抢到票后,让当前票数减1 dic["count"] -= 1 # 更新数据库中的票数 wr_info("w",dic) else: print("{}没有抢到票哦".format(person)) # 3.对抢票和读写票数做一个统一的调用 def main(person,lock): # 查看剩余票数 dic = wr_info("r") print("{}查看票数剩余: {}".format(person,dic["count"])) # 上锁 lock.acquire() # 开始抢票 get_ticket(person) # 解锁 lock.release() if __name__ == "__main__": lock = lock() lst = ["梁新宇","康裕康","张保张","于朝志","薛宇健","韩瑞瑞","假摔先","刘子涛","黎明辉","赵凤勇"] for i in lst: p = process( target=main,args=( i , lock ) ) p.start() """ 创建进程,开始抢票是异步并发程序 直到开始抢票的时候,变成同步程序, 先抢到锁资源的先执行,后抢到锁资源的后执行; 按照顺序依次执行;是同步程序; 抢票的时候,变成同步程序,好处是可以等到数据修改完成之后,在让下一个人抢,保证数据不乱。 如果不上锁的话,只剩一张票的时候,那么所有的人都能抢到票,因为程序执行的速度太快,所以接近同步进程,导致数据也不对。 """
ticket文件
{"count": 0}
2. 事件_红绿灯效果
2.1 信号量_semaphore
# ### 信号量 semaphore 本质上就是锁,只不过是多个进程上多把锁,可以控制上锁的数量 """semaphore = lock + 数量 """ from multiprocessing import semaphore , process import time , random """ # 同一时间允许多个进程上5把锁 sem = semaphore(5) #上锁 sem.acquire() print("执行操作 ... ") #解锁 sem.release() """ def singsong_ktv(person,sem): # 上锁 sem.acquire() print("{}进入了唱吧ktv , 正在唱歌 ~".format(person)) # 唱一段时间 time.sleep( random.randrange(4,8) ) # 4 5 6 7 print("{}离开了唱吧ktv , 唱完了 ... ".format(person)) # 解锁 sem.release() if __name__ == "__main__": sem = semaphore(5) lst = ["赵凤勇" , "沈思雨", "赵万里" , "张宇" , "假率先" , "孙杰龙" , "陈璐" , "王雨涵" , "杨元涛" , "刘一凤" ] for i in lst: p = process(target=singsong_ktv , args = (i , sem) ) p.start() """ # 总结: semaphore 可以设置上锁的数量 , 同一时间上多把锁 创建进程时,是异步并发,执行任务时,是同步程序; """ # 赵万里进入了唱吧ktv , 正在唱歌 ~ # 赵凤勇进入了唱吧ktv , 正在唱歌 ~ # 张宇进入了唱吧ktv , 正在唱歌 ~ # 沈思雨进入了唱吧ktv , 正在唱歌 ~ # 孙杰龙进入了唱吧ktv , 正在唱歌 ~
2.2 事件_红绿灯效果
# ### 事件 (event) """ # 阻塞事件 : e = event()生成事件对象e e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是false] # 如果是true 不加阻塞 # 如果是false 加阻塞 # 控制这个属性的值 # set()方法 将这个属性的值改成true # clear()方法 将这个属性的值改成false # is_set()方法 判断当前的属性是否为true (默认上来是false) """ from multiprocessing import process,event import time , random # 1 ''' e = event() # 默认属性值是false. print(e.is_set()) # 判断内部成员属性是否是false e.wait() # 如果是false , 代码程序阻塞 print(" 代码执行中 ... ") ''' # 2 ''' e = event() # 将这个属性的值改成true e.set() # 判断内部成员属性是否是true e.wait() # 如果是true , 代码程序不阻塞 print(" 代码执行中 ... ") # 将这个属性的值改成false e.clear() e.wait() print(" 代码执行中 .... 2") ''' # 3 """ e = event() # wait(3) 代表最多等待3秒; e.wait(3) print(" 代码执行中 .... 3") """ # ### 模拟经典红绿灯效果 # 红绿灯切换 def traffic_light(e): print("红灯亮") while true: if e.is_set(): # 绿灯状态 -> 切红灯 time.sleep(1) print("红灯亮") # true => false e.clear() else: # 红灯状态 -> 切绿灯 time.sleep(1) print("绿灯亮") # false => true e.set() # e = event() # traffic_light(e) # 车的状态 def car(e,i): # 判断是否是红灯,如果是加上wait阻塞 if not e.is_set(): print("car{} 在等待 ... ".format(i)) e.wait() # 否则不是,代表绿灯通行; print("car{} 通行了 ... ".format(i)) """ # 1.全国红绿灯 if __name__ == "__main__": e = event() # 创建交通灯 p1 = process(target=traffic_light , args=(e,)) p1.start() # 创建小车进程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = process(target=car , args=(e,i)) p2.start() """ # 2.包头红绿灯,没有车的时候,把红绿灯关了,省电; if __name__ == "__main__": lst = [] e = event() # 创建交通灯 p1 = process(target=traffic_light , args=(e,)) # 设置红绿灯为守护进程 p1.daemon = true p1.start() # 创建小车进程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = process(target=car , args=(e,i)) lst.append(p2) p2.start() # 让所有的小车全部跑完,把红绿灯炸飞 print(lst) for i in lst: i.join() print("关闭成功 .... ")
事件知识点:
# 阻塞事件 : e = event()生成事件对象e e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是false] # 如果是true 不加阻塞 # 如果是false 加阻塞 # 控制这个属性的值 # set()方法 将这个属性的值改成true # clear()方法 将这个属性的值改成false # is_set()方法 判断当前的属性是否为true (默认上来是false)
3. queue进程队列
# ### 进程队列(进程与子进程是相互隔离的,如果两者想要进行通信,可以利用队列实现) from multiprocessing import process,queue # 引入线程模块; 为了捕捉queue.empty异常; import queue # 1.基本语法 """顺序: 先进先出,后进后出""" # 创建进程队列 q = queue() # put() 存放 q.put(1) q.put(2) q.put(3) # get() 获取 """在获取不到任何数据时,会出现阻塞""" # print( q.get() ) # print( q.get() ) # print( q.get() ) # print( q.get() ) # get_nowait() 拿不到数据报异常 """[windows]效果正常 [linux]不兼容""" try: print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) except : #queue.empty pass # put_nowait() 非阻塞版本的put # 设置当前队列最大长度为3 ( 元素个数最多是3个 ) """在指定队列长度的情况下,如果塞入过多的数据,会导致阻塞""" # q2 = queue(3) # q2.put(111) # q2.put(222) # q2.put(333) # q2.put(444) """使用put_nowait 在队列已满的情况下,塞入数据会直接报错""" q2 = queue(3) try: q2.put_nowait(111) q2.put_nowait(222) q2.put_nowait(333) q2.put_nowait(444) except: pass # 2.进程间的通信ipc def func(q): # 2.子进程获取主进程存放的数据 res = q.get() print(res,"<22>") # 3.子进程中存放数据 q.put("刘一缝") if __name__ == "__main__": q3 = queue() p = process(target=func,args=(q3,)) p.start() # 1.主进程存入数据 q3.put("赵凤勇") # 为了等待子进程把数据存放队列后,主进程在获取数据; p.join() # 4.主进程获取子进程存放的数据 print(q3.get() , "<33>")
小提示: 一般主进程比子进程执行的快一些
队列知识点:
# 进程间通信 ipc # ipc inter-process communication # 实现进程之间通信的两种机制: # 管道 pipe # 队列 queue # put() 存放 # get() 获取 # get_nowait() 拿不到报异常 # put_nowait() 非阻塞版本的put q.empty() 检测是否为空 (了解) q.full() 检测是否已经存满 (了解)
4. 生产者消费者模型
# ### 生产者和消费者模型 """ # 爬虫案例 1号进程负责抓取其他多个网站中相关的关键字信息,正则匹配到队列中存储(mysql) 2号进程负责把队列中的内容拿取出来,将经过修饰后的内容布局到自个的网站中 1号进程可以理解成生产者 2号进程可以理解成消费者 从程序上来看 生产者负责存储数据 (put) 消费者负责获取数据 (get) 生产者和消费者比较理想的模型: 生产多少,消费多少 . 生产数据的速度 和 消费数据的速度 相对一致 """ # 1.基础版生产着消费者模型 """问题 : 当前模型,程序不能正常终止 """ """ from multiprocessing import process,queue import time,random # 消费者模型 def consumer(q,name): while true: # 获取队列中的数据 food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = queue() p1 = process( target=consumer,args=(q , "赵万里") ) p2 = process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p1.start() p2.start() p2.join() """ # 2.优化模型 """特点 : 手动在队列的最后,加入标识none, 终止消费者模型""" """ from multiprocessing import process,queue import time,random # 消费者模型 def consumer(q,name): while true: # 获取队列中的数据 food = q.get() # 如果最后一次获取的数据是none , 代表队列已经没有更多数据可以获取了,终止循环; if food is none: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = queue() p1 = process( target=consumer,args=(q , "赵万里") ) p2 = process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p1.start() p2.start() p2.join() q.put(none) # 香蕉0 香蕉1 香蕉2 香蕉3 香蕉4 none """ # 3.多个生产者和消费者 """ 问题 : 虽然可以解决问题 , 但是需要加入多个none , 代码冗余""" from multiprocessing import process,queue import time,random # 消费者模型 def consumer(q,name): while true: # 获取队列中的数据 food = q.get() # 如果最后一次获取的数据是none , 代表队列已经没有更多数据可以获取了,终止循环; if food is none: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = queue() p1 = process( target=consumer,args=(q , "赵万里") ) p1_1 = process( target=consumer,args=(q , "赵世超") ) p2 = process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p2_2 = process( target=producer,args=(q , "赵凤勇" , "大蒜" ) ) p1.start() p1_1.start() p2.start() p2_2.start() # 等待所有数据填充完毕 p2.join() p2_2.join() # 把none 关键字放在整个队列的最后,作为跳出消费者循环的标识符; q.put(none) # 给第一个消费者加一个none , 用来终止 q.put(none) # 给第二个消费者加一个none , 用来终止 # ...
5. joinablequeue队列使用
# ### joinablequeue 队列 """ put 存放 get 获取 task_done 计算器属性值-1 join 配合task_done来使用 , 阻塞 put 一次数据, 队列的内置计数器属性值+1 get 一次数据, 通过task_done让队列的内置计数器属性值-1 join: 会根据队列计数器的属性值来判断是否阻塞或者放行 队列计数器属性是 等于 0 , 代码不阻塞放行 队列计数器属性是 不等 0 , 意味着代码阻塞 """ from multiprocessing import joinablequeue jq = joinablequeue() jq.put("王同培") # +1 jq.put("王伟") # +2 print(jq.get()) print(jq.get()) # print(jq.get()) 阻塞 jq.task_done() # -1 jq.task_done() # -1 jq.join() print(" 代码执行结束 .... ") # ### 2.使用joinablequeue 改造生产着消费者模型 from multiprocessing import process,queue import time,random # 消费者模型 def consumer(q,name): while true: # 获取队列中的数据 food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 让队列的内置计数器属性-1 q.task_done() # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = joinablequeue() p1 = process( target=consumer,args=(q , "赵万里") ) p2 = process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p1.daemon = true p1.start() p2.start() p2.join() # 必须等待队列中的所有数据全部消费完毕,再放行 q.join() print("程序结束 ... ")
6. 总结
ipc可以让进程之间进行通信 lock其实也让进程之间进行通信了,多个进程去抢一把锁,一个进程抢到 这 把锁了,其他的进程就抢不到这把锁了,进程通过socket底层互相发 消息,告诉其他进程当前状态已经被锁定了,不能再强了。 进程之间默认是隔离的,不能通信的,如果想要通信,必须通过ipc的 方式(lock、joinablequeue、manager)
本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注的更多内容!