欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

进程二

程序员文章站 2022-07-02 13:47:24
一、管道 管道是双向通的,在创建管道时有两个端口,分别定义为conn1,conn2,而这两个端口又可以给多个进程,多个进程就可以通过管道通信。比如主进程的conn1.send()网管道里放数据,主进程的conn2.recv()可以拿值,或者子进程的conn2.recv()也可以拿值,但值只能被拿一次 ......

一、管道

  管道是双向通的,在创建管道时有两个端口,分别定义为conn1,conn2,而这两个端口又可以给多个进程,多个进程就可以通过管道通信。比如主进程的conn1.send()网管道里放数据,主进程的conn2.recv()可以拿值,或者子进程的conn2.recv()也可以拿值,但值只能被拿一次,拿了之后只就没了,还有我们是不可以用子进程的conn1.recv拿值。

from multiprocessing import process,pipe            #引入pipe模块
import time
def fun(conn1,conn2):                               #子进程的方法
    conn1.close()                                   #关闭子进程的端口conn1
    time.sleep(2)
    # ww=conn1.recv()                               #用子进程的端口conn1接收数据
    # print(ww)
    ss=conn2.recv()                                 #用子进程的端口conn2接收数据
    print(ss)
if __name__ == '__main__':
    conn1,conn2=pipe()                             #创建一个管道,并定义两个端口分别为conn1,conn2
    p=process(target=fun,args=(conn1,conn2))        #创建一个子进程,并把管道的两个端口传给他
    p.start()
    # conn2.close()                                 #因为管道是在主进程中创建,所以直接在主进程用端口,就相当于主进程的管道端口,现在是把主进程的conn2端口给关闭
    conn1.send('eeeee')                             #用主进程的conn1端口往管道里传值
    # conn2.send('ggggg')                            #用主进程的conn2端口往管道里传值
注意:从conn1端口往管道传的值,只能通过conn2拿值,不管是谁的conn2,都可以拿;同样从从conn2传的值,只能从conn1中拿值,不管是谁的conn1.
   当所有的conn1都关闭时,不管用谁的conn2传值,还是拿值都会报错

二、数据共享

  数据共享就是进程间通信的一种方式,但数据共享也会像利用文件进行进程间通信时,数据会混乱,所以也要利用同步锁来使得同一时间只有一个进程能使用

from multiprocessing import process,manager,lock
def fen(l2,lo):              #子进程
    with lo:                 #同步锁,虽创建了100个进程,但同一时间只有1个进程能对数据共享对象进行操作
        l2[0] -=1            #把列表的第0位做减1操作
if __name__ == '__main__':
    m=manager()                       #创建数据共享对象
    l2=m.list([100])                  #为对象设定一种数据类型,我现在是用的列表
    lo=lock()
    l1=[]
    for i in range(100):            #循环创建100个子进程
        p=process(target=fen,args=(l2,lo))        #把数据共享的对象传给子进程
        p.start()
        l1.append(p)
    [pp.join() for pp in l1]            #这步是让主进程等待所有子进程执行完毕
    print(l2[0])

三、进程池

  为什么要有进程池?进程池的概念。

  在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?

  在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

  在进程池中,当主进程代码执行完,不管子进程有没有执行完都会结束,相当于进程池中的子进程都变为了守护进程,会跟随主进程的结束而结束

from multiprocessing import process,pool
import time
def fun(i):
    sum=0
    for j in range(5):
        sum +=i
if __name__ == '__main__':
    p=pool(4)                     #创建进程池,设定进程池中最多允许4个进程在执行
    starr_time=time.time()        #这是没用进程池
    l1=[]
    for i in range(500):
        p1=process(target=fun,args=(i,))
        l1.append(p1)
        p1.start()
    [pp.join() for pp in l1]
    end_time=time.time()
    print(‘没用’,end_time-starr_time)
    s_time=time.time()                  #从这里是开始用进程池处理任务
    p.map(fun,range(500))          #用的是进程池的map()方法,后面的一个参数必须是可迭代的对象,而且自带进程池close()和join()方法
    e_time=time.time()
    print(‘用’,e_time-s_time)
最后打印结果是‘没用’是15秒左右,‘用’0.00099秒左右,差距是很大的,进程池的作用就凸显的很大,为什么会产生这样的差距,使用进程池只需创建最开始的4个进程,而没用进程池需要创建500个进程,这之间相差很大,这是其中一个原因

  进程池同步方法:就是把进程搞成同步的,只有一个执行完了,下一个进程才能执行,主要运用apply()方法

from multiprocessing import process,pool
import time
def fun(i):
    sum=0
    time.sleep(1)
    for n in range(5):
        sum += i
    return sum
if __name__ == '__main__':
    p=pool(4)
    for i in range(10):
        res=p.apply(fun,args=(i,))        #调用进程池的apply()方法,虽是10个进程,但是只能一个接着一个执行,此时返回的一个值
        print(res)

  进程池异步方法:运用apply_async()方法

from multiprocessing import pool,process
import time
def fun(i):
    sum=0
    time.sleep(1)
    for n in range(5):
        sum +=i
    return sum
if __name__ == '__main__':
    p=pool(4)
    l1=[]
    for i in range(10):
        res=p.apply_async(fun,args=(i,))      #调用进程池的apply_async()方法,由于进程池只能允许有4个进程,所以,相当于同时有4个进程在进程池执行,但现在返回的不是值,而是一个对象
        l1.append(res)                       #我们把对象放在一个列表,等所有程序结束再从列表拿值,如现在就用res.get()去拿值,就会形成阻塞,从而演变成进程一个接一个执行,变成同步执行的
    p.close()
    p.join()
    for res in l1:
        print(res.get())

  回调函数:callback,当一个函数需要子进程执行完后的值作为参数,我们就可以把这个函数搞成回调函数

from multiprocessing import pool
import time
def fun1(a,b):
    return a+b
def fun2(i):
    print('我是回调函数,%s'%i)
if __name__ == '__main__':
    p=pool(4)
    p.apply_async(fun1,args=(2,3),callback=fun2)    #这样会把fun1的返回值作参数传入fun2中
    time.sleep(2)
但一个问题是,若不在主进程中写入一个延迟2秒,当主进程代码走完,而子进程还没执行完,从而子进程会随着主进程结束而结束,从而回调函数也死亡