Python基础之进程(Process)
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
进程概念
- 进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。
- 进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。
- 进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。
引入进程原因
- 为了提高资源利用率和系统处理能力,现阶段计算机系统都是多道程序系统,即多道程序并发执行。
- 优化系统资源,方便计算机调度,避免系统运算紊乱。
- 进程是一种数据结构,能够清晰的刻画动态系统的内在规律,增加程序运行时的动态性。
进程特征
- 动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
- 并发性:任何进程都可以同其他进程一起并发执行。
- 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位。
- 异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进。
结构组成:程序、数据和进程控制块
多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。
进程与程序区别
- 程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
- 而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
- 程序可以作为一种软件资料长期存在,而进程是有一定生命期的。
- 程序是永久的,进程是暂时的。
并行 并发
并行(Parallelism)
并行:指两个或两个以上事件(或线程)在同一时刻发生,是真正意义上的不同事件或线程在同一时刻,在不同CPU资源呢上(多核),同时执行。
特点
- 同一时刻发生,同时执行。
- 不存在像并发那样竞争,等待的概念。
并发(Concurrency)
指一个物理CPU(也可以多个物理CPU) 在若干道程序(或线程)之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。
特点
- 微观角度:所有的并发处理都有排队等候,唤醒,执行等这样的步骤,在微观上他们都是序列被处理的,如果是同一时刻到达的请求(或线程)也会根据优先级的不同,而先后进入队列排队等候执行。
- 宏观角度:多个几乎同时到达的请求(或线程)在宏观上看就像是同时在被处理。
进程调度
FCFS
先来先服务调度法(First Come First Service):按照先后顺序处理事件的一种算法。
该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。
SJF/SPN
短作业优先法(Shortest Job First):又称为短进程优先算法(SPN,Shortest Process Next),能有效减少平均周转时间。
该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。
RR
时间片轮转法(Round Robin):让每个进程在就绪队列中的等待时间与享受服务的时间成比例,也就是需要将CPU的处理时间分成固定大小的时间片,如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。
在轮转法中,时间片长度的选取非常重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。如果时间片长度过短,则调度程序抢占处理机的次数增多。这将使进程上下文切换次数也大大增加,从而加重系统开销。反过来,如果时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
在轮转法中,加入到就绪队列的进程有3种情况:
- 分给某进程的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。
- 分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。
- 新创建进程进入就绪队列。
如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。例如,我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,但同一队列内优先级相同。这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。
多级反馈队列
多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
- 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。
- 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。
- 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。
进程状态介绍
状态描述
就绪(Ready)状态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
执行/运行(Running)状态:当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
阻塞(Blocked)状态:正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。
import time
print("程序开始") # 程序开始,运行状态
name = input("请输入姓名:") # 用户输入,进入阻塞
print(name) # 运行状态
time.sleep(1) # 睡眠1秒,阻塞状态
print("程序结束") # 运行状态
同步/异步
同步(synchronous): 所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。
简言之,要么成功都成功,失败都失败,两个任务的状态可以保持一致。
异步(asynchronous):所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。
进程创建/结束
创建
但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。
而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程:
- 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)。
- 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)。
- 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)。
- 一个批处理作业的初始化(只在大型机的批处理系统中应用)。
无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。
结束
- 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)。
- 出错退出(自愿,python a.py中a.py不存在)。
- 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)。
- 被其他进程杀死(非自愿,如kill -9)。
Python中进程操作
multiprocess.Process模块
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
语法:Process([group [, target [, name [, args [, kwargs]]]]])
由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)。
注意:1. 必须使用关键字方式来指定参数;2. args指定的为传给target函数的位置参数,是一个元祖形式,必须有逗号。
参数介绍:
group:参数未使用,默认值为None。
target:表示调用对象,即子进程要执行的任务。
args:表示调用的位置参数元祖。
kwargs:表示调用对象的字典。如kwargs = {'name':Jack, 'age':18}。
name:子进程名称。
返回值:实例化对象
方法/属性 | 说明 |
---|---|
start() | 启动进程,调用进程中的run()方法。 |
run() | 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 。 |
terminate() | 强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了子进程,那么该子进程在其强制结束后变为僵尸进程;如果该进程还保存了一个锁那么也将不会被释放,进而导致死锁。使用时,要注意。 |
is_alive() | 判断某进程是否存活,存活返回True,否则False。 |
join([timeout]) | 主线程等待子线程终止。timeout为可选择超时时间;需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 。 |
daemon | 默认值为False,如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前 |
name | 进程名称。 |
pid | 进程pid |
exitcode | 进程运行时为None,如果为-N,表示被信号N结束了。 |
authkey | 进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络连接的底层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功。 |
实例1:
import os
from multiprocessing import Process
def func_one():
print("This is son_one")
print("son_one:%s father:%s" % (os.getpid(), os.getppid()))
def func_two():
print("This is son_two")
print("son_two:%s father:%s" % (os.getpid(), os.getppid()))
if __name__ == '__main__':
p_one = Process(target=func_one)
P_two = Process(target=func_two)
p_one.start()
P_two.start()
print("son:%s father:%s" % (os.getpid(), os.getppid())) # father是pycharm
结果:
son:14560 father:8040
This is son_one
This is son_two
son_one:5228 father:14560
son_two:9736 father:14560Process finished with exit code 0
实例2:
import time
from multiprocessing import Process
def func_one(name):
print("My name is", name)
time.sleep(2)
print("This is func_one")
def func_two(name):
print("My name is", name)
time.sleep(2)
print("This is func_two")
if __name__ == '__main__':
p_one = Process(target=func_one, args=("Jack",))
p_two = Process(target=func_two, args=("Mick",))
p_one.start()
p_one.join() # 主线程要等待func_one终止,才继续往下走
p_two.start()
结果:
My name is Jack
This is func_one # 打印这一步之前等待了2秒
My name is Mick
This is func_twoProcess finished with exit code 0
注意:多个进程同时执行的顺序是随机的,不根据执行顺序的来。
我们也可以使用迭代来处理进程的执行:
import time
from multiprocessing import Process
def func_one(name):
print("My name is", name)
time.sleep(1)
def func_two(age):
print("My age is", age)
time.sleep(5)
if __name__ == '__main__':
lst_one = []
lst_two = []
for i in range(5):
p_one = Process(target=func_one, args=('Mike',))
p_two = Process(target=func_two, args=(18,))
p_one.start()
p_two.start()
lst_two.append(p_two)
# [p_two.join() for p_two in lst_two] # 父进程要等子进程结束
print("End")
结果:
End
My name is Mike
My age is 18
My age is 18
My age is 18
My age is 18
My name is Mike
My name is Mike
My age is 18
My name is Mike
My name is MikeProcess finished with exit code 0
去掉注释结果:
My name is Mike
My age is 18
My name is Mike
My name is Mike
My age is 18
My age is 18
My age is 18
My name is Mike
My name is Mike
My age is 18
EndProcess finished with exit code 0
除了上面这些开启进程的方法之外,还有一种以继承Process的方式开启进程的方式:
import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print("进程为%s,父进程为%s" % (os.getpid(), os.getppid()))
print("我的名字是%s" % self.name)
if __name__ == '__main__':
p_one = MyProcess('张三')
p_two = MyProcess('李四')
p_thr = MyProcess('王五')
p_one.start() # 自动调用run()
p_two.start()
p_thr.run() # 直接调用run()
p_one.join()
p_two.join()
# p_thr.join() # 调用run()函数的不可以调用join()
print("主进程结束")
结果:
进程为22376,父进程为17808
我的名字是王五
进程为15212,父进程为22376
我的名字是张三
进程为20588,父进程为22376
我的名字是李四
主进程结束Process finished with exit code 0
注意:直接调用run()函数的进程不能使用join()方法。
下面我们来演示进程之间数据隔离的问题:
from multiprocessing import Process
def func_one():
global n
n = 0
print("在func_one中的n为%s" % n)
def func_two():
global n
n = 1
print("在func_two中的n为%s" % n)
if __name__ == '__main__':
n = 100
p_one = Process(target=func_one)
p_two = Process(target=func_two)
p_one.start()
p_two.start()
print("主进程的n为%s" % n)
结果:
主进程的n为100
在func_one中的n为0
在func_two中的n为1Process finished with exit code 0
守护进程
守护进程就是会随着主进程的结束而结束的进程,具有以下两个特点:
- 守护进程会在主进程代码执行结束后就终止。
- 守护进程内无法再开启子进程,否则抛出异常。
实例1:
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print("进程为%s,父进程为%s" % (os.getpid(), os.getppid()))
print("我的名字是%s" % self.name)
if __name__ == '__main__':
p_one = MyProcess('张三')
p_two = MyProcess('李四')
p_two.daemon = True # 默认为False,必须在start()之前设置
p_one.start()
p_two.start()
time.sleep(5)
print("主进程结束")
结果:
进程为22452,父进程为22344
我的名字是张三
进程为11336,父进程为22344
我的名字是李四
主进程结束Process finished with exit code 0
实例2:
from multiprocessing import Process
import time
def func_one():
print("func_one")
time.sleep(2)
print("End func_one")
def func_two():
print("func_two")
time.sleep(3)
print("End func_two")
if __name__ == '__main__':
p_one = Process(target=func_one)
p_two = Process(target=func_two)
p_one.daemon = True
p_one.start()
p_two.start()
time.sleep(0.1) # 时间太短,导致print("End func_one")无法打印
print("主进程结束")
结果:
func_one
func_two
主进程结束
End func_twoProcess finished with exit code 0
多进程与socket
服务端:
from socket import *
from multiprocessing import Process
HOST = '127.0.0.1'
PORT = 8080
ADDRESS = (HOST, PORT)
BUFF_SIZE = 1024
ss = socket(AF_INET, SOCK_STREAM)
ss.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
ss.bind(ADDRESS)
ss.listen(5)
def talk(conn, add):
while 1:
try:
msg = conn.recv(BUFF_SIZE)
if not msg:
break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
while 1:
conn, add = ss.accept()
p = Process(target=talk, args=(conn, add))
p.start()
客户端:
from socket import *
HOST = '127.0.0.1'
PORT = 8080
ADDRESS = (HOST, PORT)
BUFF_SIZE = 1024
sc = socket(AF_INET,SOCK_STREAM)
sc.connect(ADDRESS)
while 1:
msg = input(">>>".strip())
if not msg:
continue
sc.send(msg.encode('utf-8'))
msg = sc.recv(BUFF_SIZE)
print(msg.decode('utf-8'))
多进程中的其它方法
terminate()和is_alive()
结束进程和判断进程是否存活。如,
from multiprocessing import Process
import time
import random
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print("我的名字是%s" % self.name)
if __name__ == '__main__':
p = MyProcess('张三')
print("开始")
p.start()
print(p.is_alive())
p.terminate()
p.join()
# time.sleep(2)
print("结束")
print(p.is_alive())
结果:
开始
True
结束
FalseProcess finished with exit code 0
锁——Lock
通过上面的研究,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题:
import os
import time
import random
from multiprocessing import Process
def work(num):
print("%s:%s正在运行" % (num, os.getpid()))
time.sleep(random.random())
print("%s:%s执行完毕" % (num, os.getpid()))
if __name__ == '__main__':
for i in range(10):
p = Process(target=work, args=(i,))
p.start()
结果:
3:11752正在运行
4:19996正在运行
7:1708正在运行
6:8004正在运行
1:19912正在运行
2:6328正在运行
9:21760正在运行
0:22464正在运行
8:21600正在运行
5:21368正在运行
7:1708执行完毕
5:21368执行完毕
4:19996执行完毕
6:8004执行完毕
8:21600执行完毕
3:11752执行完毕
9:21760执行完毕
2:6328执行完毕
1:19912执行完毕
0:22464执行完毕Process finished with exit code 0
但当我们使用锁机制后:
import os
import time
import random
from multiprocessing import Process, Lock
def work(num, lock):
lock.acquire()
print("%s:%s正在运行" % (num, os.getpid()))
time.sleep(random.random())
print("%s:%s执行完毕" % (num, os.getpid()))
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=work, args=(i, lock))
p.start()
结果:
4:232正在运行
4:232执行完毕
3:2608正在运行
3:2608执行完毕
2:12448正在运行
2:12448执行完毕
1:700正在运行
1:700执行完毕
0:5832正在运行
0:5832执行完毕
7:2852正在运行
7:2852执行完毕
5:6248正在运行
5:6248执行完毕
6:14288正在运行
6:14288执行完毕
9:21424正在运行
9:21424执行完毕
8:6588正在运行
8:6588执行完毕Process finished with exit code 0
上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。接下来,我们以模拟抢票为例,来看看数据安全的重要性。
from multiprocessing import Process, Lock
import time
import json
import random
def search():
dic = json.load(open('db'))
time.sleep(random.random()) # 模拟读取数据
print("\033[43m剩余票数:%s\033[0m" % dic['count'])
def get():
dic = json.load(open('db'))
time.sleep(random.random()) # 模拟网络延迟
if dic['count'] > 0:
dic['count'] -= 1 # 购票成功后减一
time.sleep(1)
json.dump(dic, open('db', 'w'))
print("\033[43m购票成功\033[0m")
else:
print("尚无余票")
def task():
search()
get()
if __name__ == '__main__':
for i in range(100):
p = Process(target=task)
p.start()
没有使用锁,以上代码会造成错误和紊乱。下面是加锁改进版:
from multiprocessing import Process, Lock
import time
import json
import random
def search():
dic = json.load(open('db'))
time.sleep(random.random()) # 模拟读取数据
print("\033[43m剩余票数:%s\033[0m" % dic['count'])
def get():
dic = json.load(open('db'))
time.sleep(random.random()) # 模拟网络延迟
if dic['count'] > 0:
dic['count'] -= 1 # 购票成功后减一
time.sleep(1)
json.dump(dic, open('db', 'w'))
print("\033[32m购票成功\033[0m")
else:
print("\033[31m尚无余票\033[0m")
def task(lock):
lock.acquire()
search()
get()
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=task, args=(lock,))
p.start()
结果(db文件为{"count": 3}):
剩余票数:3
购票成功
剩余票数:2
购票成功
剩余票数:1
购票成功
剩余票数:0
尚无余票
剩余票数:0
尚无余票
剩余票数:0
尚无余票
剩余票数:0
尚无余票
剩余票数:0
尚无余票
剩余票数:0
尚无余票
剩余票数:0
尚无余票Process finished with exit code 0
另一个版本:
from multiprocessing import Process, Lock
import time
import json
import random
def search(num):
dic = json.load(open('db'))
time.sleep(random.random())
print("第%s个人查到余票还剩下%s张" % (num, dic['count']))
def get(num, lock):
lock.acquire()
dic = json.load(open('db'))
time.sleep(random.random())
if dic['count'] > 0:
print('\033[31m 第%s个人买到票了\033[0m' % num)
dic['count'] -= 1
json.dump(dic, open('db', 'w'))
time.sleep(random.random())
else:
print('\033[32m 第%s个人没有买到票\033[0m' % num)
lock.release()
if __name__ == '__main__':
l = Lock()
for i in range(100):
p_search = Process(target=search, args=(i + 1,))
p_search.start()
for i in range(10):
p_get = Process(target=get, args=(i + 1, l))
p_get.start()
结果:(略)
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改。加锁牺牲了速度,但是却保证了数据的安全。
#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。
mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
事件
事件(Event),用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了event对象。
事件处理的机制:
全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False。
set:将“Flag”设置为True。
红绿灯实例:
from multiprocessing import Process, Event
import time
import random
def traffic_light(e):
'''信号灯函数'''
while 1: # 红绿灯得一直亮着,要么是红灯要么是绿灯
if e.is_set(): # True,代表绿灯亮,那么此时代表可以过车
time.sleep(5) # 所以在这让灯等5秒钟,这段时间让车过
print('\033[31m红灯亮!\n车辆等待中...\033[0m') # 绿灯亮了5秒后应该提示到红灯亮
e.clear() # 把is_set设置为False
else:
time.sleep(5) # 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒
print('\033[32m绿灯亮!\n车辆通过中...\033[0m') # 红的亮够5秒后,该绿灯亮了
e.set() # 将is_set设置为True
def car_status(num, e):
e.wait() # 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车
print('第%s辆车过去了' % num)
if __name__ == '__main__':
event = Event()
tra_light = Process(target=traffic_light, args=(event,)) # 信号灯的进程
tra_light.start()
for i in range(50): # 描述50辆车的进程
if i % 3 == 0:
time.sleep(random.randint(1, 5)) # 车辆出现时间随机
car = Process(target=car_status, args=(i + 1, event,))
car.start()
结果:(略)
队列和管道
队列和管道属于进城之间的通信机制。
队列(Queue)
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
Queue[maxsize]:maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
设q为队列实例化对象:
方法 | 说明 |
---|---|
q.get( [ block [ ,timeout ] ] ) | 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 |
q.get_nowait( ) | 同q.get(False)方法。 |
q.put(item [, block [,timeout ] ] ) | 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 |
q.qsize() | 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。 |
q.empty() | 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 |
q.full() | 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。 |
q.close() | 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。 |
q.cancel_join_thread() | 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。 |
q.join_thread() | 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。 |
实例:
from multiprocessing import Queue
q = Queue(3) # maxsize = 3
# put, get, put_nowait, get_nowait, full, empty
'''放数据'''
q.put(3)
q.put(3)
q.put(3)
# q.put(3)
# q.put(3) # 队列满了。阻塞在此处,等待别人取走,如果别人没有取走,程序将永远停留在这里
'''可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
因此我们用try来捕捉,这样的话程序就不会一直阻塞下去,但是也会丢掉这个消息'''
try:
q.put_nowait(3)
except:
print("队列满了")
'''放入数据之前,可以使用full查看队列状态,如果满了就不能继续put了,
返回值是True或False'''
print(q.full())
'''取数据'''
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 因为已经取完了,再取就会发生发生阻塞,直到后续有数据放入为止
print(q.full()) # 取完了,所以是False
'''可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。'''
try:
q.get_nowait(3)
except:
print("队列空了")
print(q.empty()) # 判断是否为空
结果:
队列满了
True
3
3
3
False
队列空了
TrueProcess finished with exit code 0
上面的例子没有引入进程,我们下面引入进程来操作队列:
import time
from multiprocessing import Process, Queue
def func(queue):
queue.put([time.asctime(), 'Hello', 'Python'])
if __name__ == '__main__':
q = Queue()
p = Process(target=func, args=(q,))
p.start()
print(q.get())
p.join()
结果:
['Mon Aug 27 15:26:47 2018', 'Hello', 'Python']
Process finished with exit code 0
上面是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:
import os
import time
import multiprocessing
def input_queue(q):
info = str(os.getpid()) + '(put):' + str(time.asctime())
q.put(info)
print(info)
def output_queue(q):
info = q.get()
print('\033[32m%s\033[0m' % info)
if __name__ == '__main__':
multiprocessing.freeze_support()
record_one = []
record_two = []
queue = multiprocessing.Queue(3)
# 放入数据
for i in range(10):
p = multiprocessing.Process(target=input_queue, args=(queue,))
p.start()
record_one.append(p)
# 取出数据
for i in range(10):
p = multiprocessing.Process(target=output_queue, args=(queue,))
p.start()
record_one.append(p)
for p in record_one:
p.join()
for p in record_two:
p.join()
结果:
328(put):Mon Aug 27 15:45:34 2018
328(put):Mon Aug 27 15:45:34 2018
27164(put):Mon Aug 27 15:45:34 2018
27164(put):Mon Aug 27 15:45:34 2018
8232(put):Mon Aug 27 15:45:34 2018
8232(put):Mon Aug 27 15:45:34 2018
23804(put):Mon Aug 27 15:45:34 2018
23804(put):Mon Aug 27 15:45:34 2018
23004(put):Mon Aug 27 15:45:34 2018
23004(put):Mon Aug 27 15:45:34 2018
22184(put):Mon Aug 27 15:45:34 2018
22184(put):Mon Aug 27 15:45:34 2018
24948(put):Mon Aug 27 15:45:34 2018
24948(put):Mon Aug 27 15:45:34 2018
19704(put):Mon Aug 27 15:45:34 2018
19704(put):Mon Aug 27 15:45:34 2018
8200(put):Mon Aug 27 15:45:34 2018
8200(put):Mon Aug 27 15:45:34 2018
14380(put):Mon Aug 27 15:45:34 2018
14380(put):Mon Aug 27 15:45:34 2018Process finished with exit code 0
管道(Pipe)
Pipe([duplex]):在线程之间创建一条管道,并返回元祖(con1,con2),其中con1,con2表示管道两端连接的对象。
duplex:默认管道为全双工的,如果将duplex映射为False,con1只能用于接收,con2只能由于发送。
注意:必须在产生Process之前产生管道。
方法 | 说明 |
---|---|
con1.recv() | 接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 |
con1.send(obj) | 通过连接发送对象。obj是与序列化兼容的任意对象。 |
con1.close() | 关闭连接。如果conn1被垃圾回收,将自动调用此方法。 |
con1.fileno() | 返回连接使用的整数文件描述符。 |
conn1.poll([timeout]) | 如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 |
conn1.recv_bytes([maxlength]) | 接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。 |
conn.send_bytes(buffer [, offset [, size]]) | 通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。 |
conn1.recv_bytes_into(buffer [, offset]) | 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。 |
实例:
from multiprocessing import Process, Pipe
def func(conn):
conn.send("Hello,This is Python!") # 发送
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 接收
p.join()
结果:
Hello,This is Python!
Process finished with exit code 0
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
下面的操作将引发EOFError:
from multiprocessing import Process, Pipe
def func(parent_conn, child_conn):
# parent_conn.close() # 写了close()将引发OSError
while 1:
try:
print(child_conn.recv())
except EOFError:
child_conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(parent_conn, child_conn))
p.start()
child_conn.close()
parent_conn.send("Hello,This is Python!")
parent_conn.close()
p.join()
结果:
Hello,This is Python!
Manager
展望未来,基于消息传递的并发编程是大势所趋。即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的。虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此:
from multiprocessing import Manager, Process, Lock
def work(d, lock):
with lock: # 不加锁而操作共享的数据,数据会出乱
d['count'] -= 1
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
dic = m.dict({'count': 100})
p_l = []
for i in range(100):
p = Process(target=work, args=(dic, lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
结果:(略)
进程池
问题:为什么要有进程池?进程池的概念?
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
语法:Pool([numprocess [,initializer [, initargs]]]):创建进程池
参数:
- numprocess:要创建的进程数,默认为cpu_count()的值。
- initializer:是每个工作进程启动时要执行的可调用对象,默认为None。
- initargs:是要传给initializer的参数组。
方法 | 说明 |
---|---|
apply(func[, args=()[, kwds={}]]) | 该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。 |
apply_async(func[, args=()[, kwds={}[, callback=None]]]) | 与apply用法一样,但它是非阻塞且支持结果返回进行回调。 |
map(func, iterable[, chunksize=None]) | Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。 注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。 |
close() | 关闭进程池(pool),使其不在接受新的任务。 |
terminate() |
结束工作进程,不在处理未处理的任务。 |
join() |
主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。 |
下面演示一些实例:
import time
from multiprocessing import Pool
def run(num):
'''
计算num的num次方
:param num: 数字
:return: num的num次方
'''
time.sleep(1)
return num ** num
if __name__ == '__main__':
lst = [1, 2, 3, 4, 5, 6]
print("顺序:")
t_one = time.time()
for fn in lst:
run(fn)
t_two = time.time()
print("执行时间:", t_two - t_one)
print("多进程:")
pool = Pool(5)
res = pool.map(run, lst)
pool.close()
pool.join()
t_thr = time.time()
print("执行时间:", t_thr - t_two)
print(res)
结果:
顺序:
执行时间: 6.0030517578125
多进程:
执行时间: 2.216660976409912
[1, 4, 27, 256, 3125, 46656]Process finished with exit code 0
上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。
程序中的res表示全部进程执行结束后全局的返回结果集,run函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了。
再来分析一个例子:
import time
from multiprocessing import Pool
def run(num):
time.sleep(2)
print(num ** num)
if __name__ == '__main__':
start_time = time.time()
lst = [1, 2, 3, 4, 5, 6]
pool = Pool(10)
pool.map(run, lst)
pool.close()
pool.join()
end_time = time.time()
print("时间:", end_time - start_time)
结果:
42561
466563125
27
时间: 2.455129384994507
Process finished with exit code 0
再运行:
14
27
256
3125
46656
时间: 2.4172260761260986Process finished with exit code 0
问题:结果中为什么还有空行和没有折行的数据呢?
其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。
利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:
from multiprocessing import Pool
from time import sleep
def func(num):
for i in range(3):
print(i, num)
sleep(2)
def main():
pool = Pool(3)
for i in range(3, 6):
res = pool.apply_async(func, (i,))
pool.close()
pool.join()
if __name__ == '__main__':
main()
结果:(略)
容量为3,每次都只是运行三个进程。
上一篇: 进程间通信介绍
下一篇: 信号的捕捉以及SIGCHLD信号