欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python进程、进程池(二)代码部分

程序员文章站 2023-12-24 09:08:21
创建进程,守护进程,进程锁Lock,进程通信IPC,JoinableQueue模块简介,进程池 ......

第一种创建进程的方式:

from multiprocessing import process
def f(name):
    print(name,"在子进程")
if __name__ == "__main__":
    p = process(target=f,args=("aaa",))
    p.start()
    print("执行主进程内容")

# 打印内容如下
执行主进程内容
aaa 在子进程

从打印结果我们可以看出程序先执行了主进程的print之后才执行了子进程的print。这里主要是因为操作系统在开辟进程时需要花费一定的时间,所以程序在这段时间里,先执行了主进程的print,然后才执行子进程print。

第二种方式创建进程:

from multiprocessing import process

class myprocess(process):
   # 这里必须要调用process中的init初始化参数
   # 否则会因为无法传参导致错误
   def __init__(self,name):
      super().__init__()  # 必须有
      self.name = name
   def run(self):
      print(f"我是子进程{self.name}")
if __name__ == "__main__":
   p = myprocess("aaa")
   p.start()
   print("我是主进程")

# 打印内容如下
我是主进程
我是子进程aaa

可以在创建子进程后使用join的方法,使程序等待子进程结束后在执行join下面的代码。

from multiprocessing import process
def f(name):
    print("我是子进程",name)
if __name__ == "__main__":
    p = process(target=f,args=("aaa",))
    p.start()
    p.join()  # 子进程结束后在,执行下面的代码
    print("我是主进程")

# 打印内容如下
我是子进程 aaa
我是主进程

使用os模块查看进程pid号。

from multiprocessing import process
import os
def f():
    print(f"父进程pid:{os.getppid()},子进程pid:{os.getpid()}")
if __name__ == "__main__":
    p = process(target=f,args=())
    p.start()
    p.join()  # 子进程结束后在,执行下面的代码
    print("主进程内容")

# 打印内容如下
父进程pid:1588,子进程pid:3292
主进程内容

执行多个进程:

from multiprocessing import process
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    for i in range(3):
        p = process(target=f, args=("子进程-"+str(i),))
        p.start()
    print("主进程")

# 打印内容如下
主进程
子进程-0
子进程-1
子进程-2

我们会发现主进程比所有子进程都优先执行了,如果我们想要在执行完所有子进程在执行父进程该怎么办呢?没错是使用join

示例一:

from multiprocessing import process
import time
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    start_time = time.time()
    for i in range(3):
        p = process(target=f, args=("子进程-"+str(i),))
        p.start()
        p.join()
    end_time = time.time()
    print(f"执行了{end_time - start_time}")

# 打印内容如下
子进程-0
子进程-1
子进程-2
执行了0.4480257034301758

从打印结果我们可以看出是所有子进程运行后才执行了主进程。但是发现会很慢,这是因为我们的join把原本应多进程同时运行的程序(异步),变成了同步,必须等待一个子进程结束后才会执行下一个子进程。这样就违背了我们多进程同时执行的初衷,所以我们的join不能放在那个位置。

下面请看示例二:

from multiprocessing import process
import time
def f(process_name):
    print(f"{process_name}")
if __name__ == "__main__":
    start_time = time.time()
    pro_list = []
    for i in range(3):
        p = process(target=f, args=("子进程-"+str(i),))
        p.start()
        pro_list.append(p) # 将进程对象添加到一个列表中

    for i in pro_list: # 循环等待所有进程结束
        i.join()
    end_time = time.time()
    print(f"执行了{end_time - start_time}")

# 打印内容如下
子进程-1
子进程-2
子进程-0
执行了0.18201017379760742

对比示例一和示例二我们可以明显发现示例二真正实现了多个进程的并发效果。

进程的创建(第二种方式创建,很少有人用)

import os
from multiprocessing import process
class myprocess(process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print(f"子进程-{self.name},pid:{os.getpid()}")

if __name__ == "__main__":
    p1 = myprocess("aaa")
    p2 = myprocess("bbb")
    p3 = myprocess("ccc")
    p1.start()
    p2.start()
    p3.start()
    print("主线程")

# 打印内容如下
子进程-aaa,pid:7360
子进程-bbb,pid:6956
子进程-ccc,pid:4912

虽然不常用,但是最好知道有这种方式可以创建线程。

守护进程:

有两个特性:

1、守护进程会在主进程代码执行结束后就终止。

2、守护进程内无法再开启子进程,否则抛出异常。

创建守护进程比较简单如下:

from multiprocessing import process
def f():
    print("守护进程")
if __name__ == "__main__":
    p = process(target=f,args=())
    p.daemon=true  # 一定要在start前执行daemom=true
    p.start()
    print("主进程")

# 打印内容如下
主进程

我们发现守护进程并没有被执行,或者说还没来得及执行就结束了,我们知道操作系统在开启进程时要花费一定时间,在这个时间内主进程代码执行完了,所以守护进程还没来得及执行就结束了。可以使用join来等待守护进程执行完毕后在结束主进程。

from multiprocessing import process
def f():
    print("守护进程")
if __name__ == "__main__":
    p = process(target=f,args=())
    p.daemon=true  # 一定要在start前执行daemom=true
    p.start()
    p.join()  # 等待守护进程结束
    print("主进程")

# 打印内容如下
守护进程
主进程

进程锁:

为保证数据的安全性,在有些场合要使用进程锁,进程锁会使由原来的并行变成串行,程序效率会下降,但是却保证了数据的安全性,在数据安全性和程序效率面前,数据的安全性是大于程序的效率的。

下面以抢票为例,现在票数还有一张:

python进程、进程池(二)代码部分

 

from multiprocessing import process
import time,json
def search(name):  # 查票
    di = json.load(open("db"))
    print(f"{name}查票,剩余票数{di['count']}")

def get(name):  # 购票
    di = json.load(open("db"))
    time.sleep(0.1)
    if di["count"] > 0:
        di["count"] -= 1
        time.sleep(0.2)
        json.dump(di,open("db","w"))
        print(f"{name}购票成功")
def task(name):
    search(name)
    get(name)
if __name__ == "__main__":
    for i in range(5):  # 只模拟5个人抢一张票
        p = process(target=task,args=("游客-"+str(i),))
        p.start()

# 打印结果如下
游客-2查票,剩余票数1
游客-1查票,剩余票数1
游客-0查票,剩余票数1
游客-4查票,剩余票数1
游客-3查票,剩余票数1
游客-2购票成功
游客-1购票成功
游客-0购票成功
游客-4购票成功
游客-3购票成功

所有人全部购票成功,这就对数据的安全性提出了挑战。本来只有一张票,但是5个人都显示购票成功,这当然不是我们想要的结果,问题的原因在于,所有的游客在差不多同一时间都进行了购票,大家看到的票数都是1张,第一个用户购票后,将票数减1等于0还没来得及将结果写入文件,其它用户也进行了购票的操作,在余票0被写入文件的过程中,其它用户也购票成功,并将结果写入文件,造成了数据的混乱。

这里我们使用进程锁lock也叫互斥锁,来解决问题。

from multiprocessing import process,lock
import time,json

def search(name):  # 查票
    di = json.load(open("db"))
    print(f"{name}查票,剩余票数{di['count']}")

def get(name):  # 购票
    di = json.load(open("db"))
    time.sleep(0.1)
    if di["count"] > 0:
        di["count"] -= 1
        time.sleep(0.2)
        json.dump(di,open("db","w"))
        print(f"{name}购票成功")
def task(name,lock):
    search(name)  # 查票
    lock.acquire() # 加锁
    get(name)   # 购票
    lock.release()  # 解锁
if __name__ == "__main__":
    lock = lock()  # 获取锁
    for i in range(5):  # 只模拟5个人抢一张票
        p = process(target=task,args=("游客-"+str(i),lock))
        p.start()

# 打印内容如下
游客-0查票,剩余票数1
游客-1查票,剩余票数1
游客-2查票,剩余票数1
游客-3查票,剩余票数1
游客-4查票,剩余票数1
游客-0购票成功

在购票时加一个互斥锁,这样一个进程在购票时,其它的进程只能查看就不能进行购票的操作了,保证了数据的安全性,最终结果是正确的,这就是为什么我们明明看到有票,但是点击购买后却说没票的原因,虽然加锁后使原本并行的程序,变成了串行。但我们要知道在不能保证数据安全的情况下一切效率都是空谈。

进程间的通信ipc(inter-process communication)队列

队列:queue是多进程安全的队列,可以使用queue实现多进程之间的数据传递。

队列的常用方法:

queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大数值。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果队列为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为true. 如果设置为false,将引发queue.empty异常(定义在queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发queue.empty异常。
q.get_nowait( ) 同q.get(false)方法。
q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为true。如果设置为false,将引发queue.empty异常(定义在queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发queue.full异常。
q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发notimplementederror异常。
q.empty() 
如果调用此方法时 q为空,返回true。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full() 
如果q已满,返回为true. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

下面我们已生产者消费者模型来进行演示:

 

from multiprocessing import process,queue
import time,random

def consumer(name,q):  # 消费者
    while true:
        task = q.get() # 从队列中取出数据
        if task == none:break
        print(f"{name}获取数据{task}")
        time.sleep(random.random())   # 消费者效率比生产者效率高

def producer(name,q):  # 生产者
    for i in range(3):
        q.put(i)  # 向对列中添加数据
        print(f"{name}生产数据{i}")
        time.sleep(random.uniform(1,2))  # 模拟生产者的效率没有消费者效率高

if __name__ == "__main__":
    q = queue()  # 获取一个队列
    pro = []
    for i in range(3):  # 开启生产者进程
        p = process(target=producer,args=("生产者"+str(i),q))
        p.start()
        pro.append(p)
    # 开启消费者进程
    p1 = process(target=consumer,args=("aaa",q))
    p2 = process(target=consumer,args=("bbb",q))
    p1.start()
    p2.start()
    for i in pro:  # 等待生产者结束
        i.join()
  
    q.put(none)  # 有几个消费者进程,就put几次none
    q.put(none)

joinablequeue([maxsize]) 模块
创建可连接的共享进程队列。这就像是一个queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

joinablequeue的实例p除了与queue对象相同的方法之外,还具有以下方法:
q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发valueerror异常。
q.join() 
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

我们在来实现上述的生产者消费者模型。

from multiprocessing import process,joinablequeue
import time,random

def consumer(name,q):  # 消费者
    while true:
        task = q.get() # 从队列中取出数据
        q.task_done()  # 通知生产者,我已经取完所有数据了
        print(f"{name}获取数据{task}")
        time.sleep(random.random())   # 消费者效率比生产者效率高

def producer(name,q):  # 生产者
    for i in range(1):
        q.put(i)  # 向对列中添加数据
        print(f"{name}生产数据{i}")
        time.sleep(random.uniform(1,2))  # 模拟生产者的效率没有消费者效率高
    q.join()  # 生产完毕,等待消费者通知数据已经获取完了

if __name__ == "__main__":
    q = joinablequeue()  # 获取一个队列
    pro = []
    for i in range(1):  # 开启生产者进程
        p = process(target=producer,args=("生产者"+str(i),q))
        p.start()
        pro.append(p)
    # 开启消费者进程
    p1 = process(target=consumer,args=("aaa",q))
    p2 = process(target=consumer,args=("bbb",q))
    p1.daemon=true  # 如果不设置守护进程,这两个进程就不会结束。
    p2.daemon=true  # 因为他们只是通知生产者我接收到所有数据了,并没有终止循环。
    p1.start()
    p2.start()
    for i in pro:  # 等待生产者结束
        i.join()

这里再次说明将消费者设置成守护进程的原因,q.task_done它只是通知生产者,我把数据已经都取完了,仅此而已,所以while循环并不会退出。如果不设置守护进程,程序会卡在while循环里。

 

进程池

进程池就是预先创建一个进程组,然后有任务时从池中分配一个进程去执行任务。当任务数量超过进程池的数量时,就必须等待进程池中有空闲的进程时,才能利用空闲的进程去执行任务。

进程池的优点:

1、充分利用cpu资源。

2、多个进程在同一时刻可以同时执行,达到了并行的效果。

进程池的缺点:进程的创建、销毁需要耗费cpu的时间。多进程适用于需要复杂计算少i/o阻塞的情况。如果程序不涉及复杂运算,最好是使用线程池。

关于进程池multiprocessing.pool的一些方法

apply(func [, args [, kwargs]]):
 func(*args,**kwargs),然后返回结果。
需要注意的是:apply属于进程同步的操作,即必须等待一个进程结束后才能执行下一个进程。 
apply_async(func [, args [, kwargs]]):
func(*args,**kwargs),然后返回结果。
apply_async属于进程的异步操作,所有进程可以同时执行,此方法的结果是asyncresult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将立即传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
p.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

同步进程池的示例:

import os,time
from multiprocessing import pool
def work(n):
    print("pid:%s run" %os.getpid())
    time.sleep(1)
    return n ** 2

if __name__ == "__main__":
    p = pool(3)  # 开启进程池
    res = []
    for i in range(3):
        res.append(p.apply(work,args=(i,)))  # 进程同步模式
    print(res)  # 打印返回结果

 # 打印内容如下
pid:6180 run
pid:9728 run
[0, 1, 4]

因为是进程池的同步,所以进程时的执行顺序是有序的,并且必须一个进程执行后才执行下一个进程。

进程池的异步示例: 

import os,time
from multiprocessing import pool
def work(n):
    print("pid:%s run" %os.getpid())
    time.sleep(1)
    return n ** 2

if __name__ == "__main__":
    p = pool(3)  # 开启进程池
    res = []
    for i in range(5):
        # 进程异步模式
        res.append(p.apply_async(work,args=(i,)))
    # 因为是异步,所以开进程会很快
    # 所以我们所有进程结束后在打印结果
    p.close()  # 关闭进程池
    p.join()   # 等待进程池结束,
    for i in res:
        print(i.get(),end=" ")

# 打印内容如下
pid:7512 run
pid:10176 run
pid:7240 run
pid:10176 run
pid:7512 run
0 1 4 9 16

关于进程池有个问题需要注意,在子进程中不能使用input函数() 

 

上一篇:

下一篇: