荐 【操作系统知识】python实现多进程
5. python实现多进程
Unix/Linux操作系统提供了一个
fork()
系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()
调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用
getppid()
就可以拿到父进程的ID。
父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度算法。各个进程有独立的运行空间,不共享全局变量;有时会因父进程提前退出,子进程的父进程和开始的不一致。
# os模块封装了常见的系统调用
import os
print(f"Process {os.getpid()} start...") # unix/linux/mac系统中
pid = os.fork()
if pid < 0:
print("fork()调用失败")
if pid == 0:
print(f"父进程pid: {os.getppid()}, 子进程pid: {os.gerpid()}.") # 子进程在系统的pid不为0
else:
print(f"子进程pid: {os.getpid()}, 父进程pid: {pid}") # 父进程返回子进程的ID
# 运行结果
'''
Process 3362 start...
父进程pid:3362, 子进程pid:3363
子进程pid:3363, 子进程pid:3362
'''
由于Windows没有fork调用,而Python是跨平台的,所以可以调用multiprocessing模块中的Process类来实现跨平台的多进程。
Process()的语法:
Process([group[, target[, name [, args[, kwargs]]]]])
,group参数未使用,默认为None;参数target表示这个进程实例所调用的对象;args表示调用这个对象的位置和参数元组;kwargs表示调用对象的关键子参数字典;name子进程名称。
Process属性方法
方法/属性 | 说明 |
---|---|
start() | 启动进程,调用进程中的run()方法。 |
run() | 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。 |
timinate() | 强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了自进程,那么该子进程在其强制结束后变为僵尸进程;如果进程还保存了一个锁,那么也将不会被释放,进而导致死锁。使用时,要注意。 |
is_alive() | 判断某进程是否存活,存活返回True,否则False。 |
join([timeout]) | 主线程等待子线程终止。timeout为可选超时时间;p.join()只能join住start开启的进程,而不能join住run开启的进程 |
daemon() | 默认值False, 如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前。 |
exitcode | 进程运行时为None,如果为-N,表示信号N结束了。 |
authkey | 进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络链接的低层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功。 |
from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def run_proc(name):
time.sleep(3)
print(f"子进程{name}运行中,pid={os.getpid()},父进程的pid={os.getppid()}")
if __name__ == "__main__":
print(f"父进程是{os.getpid()}.")
p = Process(target=run_proc, args=('test',))
print("子进程开始启动.")
p.start() # 启动进程
p.join() # 阻塞当前进程,直到调用join方法的那个进程执行完,在继续执行当前进程
print("子进程运行结束")
# 结果
'''
父进程是12324.
子进程开始启动.
子进程test运行中,pid=4472,父进程的pid=12324
子进程运行结束
'''
创建自己的进程类,继承于Process类:
from multiprocessing import Process
import os
import time
class MyProcess(Process):
def __init__(self, interval):
super().__init__()
self.interval = interval
def run(self):
print("子进程")
start_time = time.time()
time.sleep(self.interval)
stop_time = time.time()
print(f"子进程id: {os.getpid()}, 父进程id:{os.getppid()}, 共执行了{stop_time-start_time: .2f} 秒.")
if __name__ == "__main__":
print("主进程")
startTime = time.time()
p = MyProcess(2)
p.start()
p.join()
stopTime = time.time()
print(f"子进程结束,花费了{stopTime-startTime: .2f}")
# 结果
'''
主进程
子进程
子进程id: 3008, 父进程id:13084, 共执行了 2.00 秒.
子进程结束,花费了 2.79
'''
如果需要创建很多进程,需要用到进程池Pool方法。初始化进程时,可以指定一个最大进程数(默认为CPU核数),当池中的进程数达到最大值时,该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。
from multiprocessing import Pool
import os
import time
import random
# 进程类
def worker(msg):
start_time = time.time()
print(f"{msg}开始执行,执行进程id为{os.getpid()},父进程id为{os.getppid()}")
time.sleep(random.random()*2)
stop_time = time.time()
print(msg, f"执行完毕,耗时{stop_time-start_time: .2f}秒。")
if __name__ == "__main__":
print("----开始执行----")
start = time.time()
pool = Pool(3)
for i in range(0, 10):
pool.apply_async(worker, (i,)) # 异步非阻塞
pool.close() # 关闭进程池,关闭后不再就收新请求
pool.join() # 必须放在close语句后面
stop = time.time()
print(f"执行结束,总耗时{stop-start:.2f}")
# 结果
'''
----开始执行----
0开始执行,执行进程id为4640,父进程id为7116
1开始执行,执行进程id为8,父进程id为7116
2开始执行,执行进程id为7728,父进程id为7116
0 执行完毕,耗时 0.09秒。
3开始执行,执行进程id为4640,父进程id为7116
2 执行完毕,耗时 0.10秒。
4开始执行,执行进程id为7728,父进程id为7116
4 执行完毕,耗时 0.04秒。
5开始执行,执行进程id为7728,父进程id为7116
3 执行完毕,耗时 0.20秒。
6开始执行,执行进程id为4640,父进程id为7116
6 执行完毕,耗时 0.12秒。
7开始执行,执行进程id为4640,父进程id为7116
5 执行完毕,耗时 1.38秒。
8开始执行,执行进程id为7728,父进程id为7116
1 执行完毕,耗时 1.95秒。
9开始执行,执行进程id为8,父进程id为7116
8 执行完毕,耗时 0.48秒。
7 执行完毕,耗时 1.81秒。
9 执行完毕,耗时 1.58秒。
执行结束,总耗时4.63
'''
如果使用进程池pool创建进程的话,就需要使用
Manager().Queue()
。
from multiprocessing import Manager, Pool
import os
# queue,实现多进程间的数据传递,其实是个消息队列
def write(q):
print(f"--开始执行写进程 {os.getpid()}--")
for value in ['A', 'B', 'C']:
print(f"把 {value} 放入队列")
q.put(value)
def read(q):
print(f"--开始执行读进程 {os.getpid()}--")
for i in range(q.qsize()):
print(f"从队列读取 {q.get(True)}")
if __name__ == "__main__":
print(f"--开始执行主进程 {os.getpid()}--")
q = Manager().Queue()
p = Pool()
p.apply(write, args=(q,)) # apply 阻塞,上个进程结束才会执行下一个进程
p.apply(read, args=(q,))
p.close() # 关闭pool 不再接收请求
p.join() # 主进程阻塞,等待子进程运行结束,必须在close以后
print("--主进程结束--")
# 结果
'''
--开始执行主进程 6012--
--开始执行写进程 11524--
把 A 放入队列
把 B 放入队列
把 C 放入队列
--开始执行读进程 11524--
从队列读取 A
从队列读取 B
从队列读取 C
--主进程结束--
'''
当子进程不是自身,而是一个外部进程时,创建子进程后,还需要控制子进程的输入和输出。subprocess模块可以非常方便的启动子进程,然后可以通过communicate()方法输入。
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode("gbk"))
print('exit code:', p.returncode)
# 结果
'''
$ nslookup
默认服务器: dnspai-public-dns.dnspai.com
Address: 101.226.4.6
> > 服务器: dnspai-public-dns.dnspai.com
Address: 101.226.4.6
python.org MX preference = 50, mail exchanger = mail.python.org
>
exit code: 0
'''
本文地址:https://blog.csdn.net/holysll/article/details/107226353