python进程之间的通信——Queue
程序员文章站
2022-04-29 20:23:26
我们知道进程之间的数据是互不影响的,但有时我们需要在进程之间通信,那怎么办呢? 认识Queue 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理: put: get: 说明: get (s ......
我们知道进程之间的数据是互不影响的,但有时我们需要在进程之间通信,那怎么办呢?
认识queue
可以使用multiprocessing模块的queue实现多进程之间的数据传递,queue本身是一个消息列队程序,首先用一个小实例来演示一下queue的工作原理:
put:
from multiprocessing import queue # 创建一个实例,指定最大容量为3,若不指定则无限大(直到内存的尽头)。 q = queue(3) q.put("a") q.put("b") q.put("c") # 队列已满,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止。 q.put("d")
get:
from multiprocessing import queue # 创建一个实例,指定最大容量为3,若不指定则无限大(直到内存的尽头)。 q = queue(3) q.put("a") q.put("b") q.put("c") q.get() # 'a' q.get() # 'b' q.get() # 'c' # # 队列为空,此时程序被阻塞,知道队列中再有数据。 q.get()
说明:
- get(self, block=true, timeout=none) 和 put(self, obj, block=true, timeout=none)
- get和put在默认情况是block(阻塞)为true,timeout(超时时间)=none,只要队列中没有数据或者空队列时一直被阻塞。
- block设为false(关闭阻塞),timeout保持默认时,只要队列中没有数据或队满就立即报异常。
get(false)和get_nowait()是等价的,put(要入的的数据,false)和put_nowait(要入队的数据)也是等价的。 - block=false,timeout=2(timeout超时时间的单位是秒) 表示队满或者队空时,等待2s,如果还是队满或队空,那就报异常。表现形式为:get(false,2)、put(要入队的数据,false,2)
使用queue
我们以queue为例,在父进程中创建两个子进程,一个往queue里写数据,一个从queue里读数据:
from multiprocessing import process, queue import os import time import random # 写数据进程执行的代码 def write(q): for value in ["a", "b", "c"]: print("put %s to queue.." % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码 def read(q): while true: if not q.empty(): value = q.get() print("get %s to queue.." % value) time.sleep(random.random()) else: break if __name__ == '__main__': # 父进程创建queue,传给各个子进程 q = queue() pw = process(target=write, args=(q,)) pr = process(target=read, args=(q,)) # 启动子进程 pw.start() # 等待写数据的子进程结束 pw.join() pr.start() pr.join() print("所有数据都写完并且读完")
进程池中的queue
如果要使用pool创建进程,就需要使用multiprocessing.manager()中的queue(),而不是multiprocessing.queue(),否则会得到一条如下的错误信息:
runtimeerror: queue objects should only be shared between processes through inheritance.
from multiprocessing import pool, manager import os import time import random # 写数据进程执行的代码 def write(q): for value in ["a", "b", "c"]: print("put %s to queue.." % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码 def read(q): while true: if not q.empty(): value = q.get() print("get %s to queue.." % value) time.sleep(random.random()) else: break if __name__ == '__main__': print("(%s) start" % os.getpid()) # 父进程创建queue,传给各个子进程 q = manager().queue() po = pool() po.apply(write, (q,)) po.apply(read, (q,)) po.close() # po.join() # 阻塞式一般不需要 print("(%s) end" % os.getpid())