进程二
一、管道
管道是双向通的,在创建管道时有两个端口,分别定义为conn1,conn2,而这两个端口又可以给多个进程,多个进程就可以通过管道通信。比如主进程的conn1.send()网管道里放数据,主进程的conn2.recv()可以拿值,或者子进程的conn2.recv()也可以拿值,但值只能被拿一次,拿了之后只就没了,还有我们是不可以用子进程的conn1.recv拿值。
from multiprocessing import process,pipe #引入pipe模块 import time def fun(conn1,conn2): #子进程的方法 conn1.close() #关闭子进程的端口conn1 time.sleep(2) # ww=conn1.recv() #用子进程的端口conn1接收数据 # print(ww) ss=conn2.recv() #用子进程的端口conn2接收数据 print(ss) if __name__ == '__main__': conn1,conn2=pipe() #创建一个管道,并定义两个端口分别为conn1,conn2 p=process(target=fun,args=(conn1,conn2)) #创建一个子进程,并把管道的两个端口传给他 p.start() # conn2.close() #因为管道是在主进程中创建,所以直接在主进程用端口,就相当于主进程的管道端口,现在是把主进程的conn2端口给关闭 conn1.send('eeeee') #用主进程的conn1端口往管道里传值 # conn2.send('ggggg') #用主进程的conn2端口往管道里传值
注意:从conn1端口往管道传的值,只能通过conn2拿值,不管是谁的conn2,都可以拿;同样从从conn2传的值,只能从conn1中拿值,不管是谁的conn1.
当所有的conn1都关闭时,不管用谁的conn2传值,还是拿值都会报错
二、数据共享
数据共享就是进程间通信的一种方式,但数据共享也会像利用文件进行进程间通信时,数据会混乱,所以也要利用同步锁来使得同一时间只有一个进程能使用
from multiprocessing import process,manager,lock def fen(l2,lo): #子进程 with lo: #同步锁,虽创建了100个进程,但同一时间只有1个进程能对数据共享对象进行操作 l2[0] -=1 #把列表的第0位做减1操作 if __name__ == '__main__': m=manager() #创建数据共享对象 l2=m.list([100]) #为对象设定一种数据类型,我现在是用的列表 lo=lock() l1=[] for i in range(100): #循环创建100个子进程 p=process(target=fen,args=(l2,lo)) #把数据共享的对象传给子进程 p.start() l1.append(p) [pp.join() for pp in l1] #这步是让主进程等待所有子进程执行完毕 print(l2[0])
三、进程池
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果
在进程池中,当主进程代码执行完,不管子进程有没有执行完都会结束,相当于进程池中的子进程都变为了守护进程,会跟随主进程的结束而结束
from multiprocessing import process,pool import time def fun(i): sum=0 for j in range(5): sum +=i if __name__ == '__main__': p=pool(4) #创建进程池,设定进程池中最多允许4个进程在执行 starr_time=time.time() #这是没用进程池 l1=[] for i in range(500): p1=process(target=fun,args=(i,)) l1.append(p1) p1.start() [pp.join() for pp in l1] end_time=time.time() print(‘没用’,end_time-starr_time) s_time=time.time() #从这里是开始用进程池处理任务 p.map(fun,range(500)) #用的是进程池的map()方法,后面的一个参数必须是可迭代的对象,而且自带进程池close()和join()方法 e_time=time.time() print(‘用’,e_time-s_time)
最后打印结果是‘没用’是15秒左右,‘用’0.00099秒左右,差距是很大的,进程池的作用就凸显的很大,为什么会产生这样的差距,使用进程池只需创建最开始的4个进程,而没用进程池需要创建500个进程,这之间相差很大,这是其中一个原因
进程池同步方法:就是把进程搞成同步的,只有一个执行完了,下一个进程才能执行,主要运用apply()方法
from multiprocessing import process,pool import time def fun(i): sum=0 time.sleep(1) for n in range(5): sum += i return sum if __name__ == '__main__': p=pool(4) for i in range(10): res=p.apply(fun,args=(i,)) #调用进程池的apply()方法,虽是10个进程,但是只能一个接着一个执行,此时返回的一个值 print(res)
进程池异步方法:运用apply_async()方法
from multiprocessing import pool,process import time def fun(i): sum=0 time.sleep(1) for n in range(5): sum +=i return sum if __name__ == '__main__': p=pool(4) l1=[] for i in range(10): res=p.apply_async(fun,args=(i,)) #调用进程池的apply_async()方法,由于进程池只能允许有4个进程,所以,相当于同时有4个进程在进程池执行,但现在返回的不是值,而是一个对象 l1.append(res) #我们把对象放在一个列表,等所有程序结束再从列表拿值,如现在就用res.get()去拿值,就会形成阻塞,从而演变成进程一个接一个执行,变成同步执行的 p.close() p.join() for res in l1: print(res.get())
回调函数:callback,当一个函数需要子进程执行完后的值作为参数,我们就可以把这个函数搞成回调函数
from multiprocessing import pool import time def fun1(a,b): return a+b def fun2(i): print('我是回调函数,%s'%i) if __name__ == '__main__': p=pool(4) p.apply_async(fun1,args=(2,3),callback=fun2) #这样会把fun1的返回值作参数传入fun2中 time.sleep(2)
但一个问题是,若不在主进程中写入一个延迟2秒,当主进程代码走完,而子进程还没执行完,从而子进程会随着主进程结束而结束,从而回调函数也死亡
上一篇: 算法竞赛入门经典5.1 从c到c++
下一篇: .gitignore
推荐阅读
-
.Net Core 项目发布到Linux - CentOS 7(二)用Supervisor守护netcore进程
-
操作系统学习(二)--进程描述和执行
-
Glidedsky第二关详细解答(结合进程池)
-
python基础(32):进程(二)
-
一起talk C栗子吧(第一百零一回:C语言实例--使用信号量进行进程间同步与互斥二)
-
python之进程学习(二)
-
python进程、进程池(二)代码部分
-
一起talk C栗子吧(第九十八回:C语言实例--使用消息队列进行进程间通信二)
-
python多进程与多线程(二):使用多进程
-
PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询三:获取内存快照