使用multiprocesss模块进程通信采用队列方式,子进程run执行完一直不能退出的问题
进程间通信常用的方法有信号量、共享内存、消息队列,python的multiprocesss
模块提供了与平台无关的进程相关的API
在项目中使用 multiprocesss
多进程编程时遇到奇怪的问题是: 使用消息队列在不同进程间通信,子进程run方法执行后进程一直不能退出,主进程里调用join方法等待子进程结束,子进程一直不能退出从而导致主进程不能退出
项目背景:
由于在线程对象里不能创建 logger 对象,子进程产生的日志如果需要保存,一种不太推荐的方法是把日志放到消息队列里,子进程结束后主进程把日志从信息队列中读出来。 以下是重现项目中出现问题的代码:
import ctypes
import re
from multiprocessing import Queue
from multiprocessing import Pool
import random
import time
from multiprocessing import Value
from multiprocessing import Process
from src.spider.logmanage import loadLogger
from src.util.mongodbhelper import MongoDBCRUD
class Counsumer(Process):
def __init__(self, queue: Queue, name: str, logqueue: Queue):
super().__init__()
self.queue = queue
self.name = name
self.flag = Value(ctypes.c_bool, False)
self.logqueue = logqueue
def run(self):
print("{} process stared!".format(self.name))
arraylist = []
while True:
# self.flag 为共享内存里的boolean变量,
# 主进程把该变量设置为true时退出循环
if self.flag.value:
print("counsumer.flag = {}".format(self.flag.value))
# 往日志队列里写入 2432B 字符
for j in range(1):
self.logqueue.put("*" * (2048+256+128))
# 从MongoDb里查询集合中文档数量
count = MongoDBCRUD.query_collection_count("person")
print("collection conference has {} documents".format(count))
try:
# 往MongoDb中插入 50 条数据
for i in range(50):
MongoDBCRUD.insert({"name": "Sam{}".format(i), "age": 29+i}, "person")
except Exception as e:
print("操作数据库时发生异常:\n{}\n".format(e))
self.logqueue.put("操作数据库时发生异常:\n{}\n".format(e))
while not MongoDBCRUD.execresultqueue.empty():
self.logqueue.put(MongoDBCRUD.execresultqueue.get())
break
while not self.queue.empty():
arraylist.append(self.queue.get())
if len(arraylist) >= 6:
print("*" * 50)
print(arraylist)
arraylist.clear()
print("comsumer process exit!")
def doMatch(s: str, queue: Queue):
index = random.randint(0, len(s)-4)
queue.put(s[index:index+2])
time.sleep(1)
if __name__ == '__main__':
processpool = Pool(processes=8)
logger = loadLogger("../testunit/applogconfig.ini")
queue = Queue(10)
logqueue = Queue()
counsumer = Counsumer(queue, "consumer_process", logqueue)
# 启动 counsumer 进程
counsumer.start()
param = "0123456789abcdefghijklmnopqrstuvwxyz"
for i in range(15):
processpool.apply_async(doMatch(param, queue))
processpool.close()
processpool.join()
# 进程池里的任务全部结束,把共享内存里的flag标志位置为True
counsumer.flag.value = True
# 等待 counsumer 进程结束
counsumer.join()
# 读取子进程里产生的日志队列数据
while not logqueue.empty():
logger.debug(logqueue.get())
print("main process done!")
运行结果如下:
程序中创建了8个进程的进程池,往进程池里放入15个任务,每个任务往消息队列queue
里放一个运算完成的结果,在子进程counsumer 里每当检测到queue
里消息数量大于等于6就把消息读取出来处理,当进程池里的所有任务处理完,把counsumer
对象的共享内存变量 flag 置为 true ,让子进程退出循环,退出循环前往日志队列里写入2.4k字节的数据; 退出循环后可以看到在控制台里打印了run() 方法最后一句,但程序没有完全退出,使用任务管理器查看可以看到还有两个进程一直没有退出,由于进程池结束了,所以这两个进程应该是 主进程 和 counsumer进程
修改 run() 方法里的 self.logqueue.put("*" * (2048+256+128))
这句, 把这句改为self.logqueue.put("*" * (2048+256))
后再运行的结果如下:
从上面的结果不难看出:
- 虽然说构造 Queue( )如果不指定队列
maxsize
默认是队列是无大小限制,但从上面奇怪的问题来看,如果往队列里放入大对象 将会导致进程无法正常退出,目前还是不清楚进程为什么一直处于阻塞状态 - 通过消息队列在进程间传递日志是一种不好的做法,日志内容量大时容易出现各种各样的问题
上一篇: iOS之线程(二)GCD
下一篇: Python 8数据读写