欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 【操作系统知识】python实现多进程

程序员文章站 2022-06-15 12:39:10
5. python实现多进程Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用getppid()就可以拿到父进程的ID。父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度...

5. python实现多进程

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用getppid()就可以拿到父进程的ID。

父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度算法。各个进程有独立的运行空间,不共享全局变量;有时会因父进程提前退出,子进程的父进程和开始的不一致。

# os模块封装了常见的系统调用

import os

print(f"Process {os.getpid()} start...")  # unix/linux/mac系统中

pid = os.fork()

if pid < 0:
    print("fork()调用失败")
if pid == 0:
    print(f"父进程pid: {os.getppid()}, 子进程pid: {os.gerpid()}.")  # 子进程在系统的pid不为0
else:
    print(f"子进程pid: {os.getpid()}, 父进程pid: {pid}")  # 父进程返回子进程的ID

# 运行结果
'''
Process 3362 start...
父进程pid:3362, 子进程pid:3363
子进程pid:3363, 子进程pid:3362
'''

由于Windows没有fork调用,而Python是跨平台的,所以可以调用multiprocessing模块中的Process类来实现跨平台的多进程。

Process()的语法:Process([group[, target[, name [, args[, kwargs]]]]]),group参数未使用,默认为None;参数target表示这个进程实例所调用的对象;args表示调用这个对象的位置和参数元组;kwargs表示调用对象的关键子参数字典;name子进程名称。

Process属性方法

方法/属性 说明
start() 启动进程,调用进程中的run()方法。
run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。
timinate() 强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了自进程,那么该子进程在其强制结束后变为僵尸进程;如果进程还保存了一个锁,那么也将不会被释放,进而导致死锁。使用时,要注意。
is_alive() 判断某进程是否存活,存活返回True,否则False。
join([timeout]) 主线程等待子线程终止。timeout为可选超时时间;p.join()只能join住start开启的进程,而不能join住run开启的进程
daemon() 默认值False, 如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前。
exitcode 进程运行时为None,如果为-N,表示信号N结束了。
authkey 进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络链接的低层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功。
from multiprocessing import Process
import os
import time

# 子进程要执行的代码
def run_proc(name):
    time.sleep(3)
    print(f"子进程{name}运行中,pid={os.getpid()},父进程的pid={os.getppid()}")

if __name__ == "__main__":
    print(f"父进程是{os.getpid()}.")
    p = Process(target=run_proc, args=('test',))
    print("子进程开始启动.")
    p.start()  # 启动进程
    p.join()  # 阻塞当前进程,直到调用join方法的那个进程执行完,在继续执行当前进程
    print("子进程运行结束")

# 结果
'''
父进程是12324.
子进程开始启动.
子进程test运行中,pid=4472,父进程的pid=12324
子进程运行结束
'''

创建自己的进程类,继承于Process类:

from multiprocessing import Process
import os
import time

class MyProcess(Process):
    def __init__(self, interval):
        super().__init__()
        self.interval = interval

    def run(self):
        print("子进程")
        start_time = time.time()
        time.sleep(self.interval)
        stop_time = time.time()
        print(f"子进程id: {os.getpid()}, 父进程id:{os.getppid()}, 共执行了{stop_time-start_time: .2f} 秒.")

if __name__ == "__main__":
    print("主进程")
    startTime = time.time()
    p = MyProcess(2)
    p.start()
    p.join()
    stopTime = time.time()
    print(f"子进程结束,花费了{stopTime-startTime: .2f}")

# 结果
'''
主进程
子进程
子进程id: 3008, 父进程id:13084, 共执行了 2.00 秒.
子进程结束,花费了 2.79
'''

如果需要创建很多进程,需要用到进程池Pool方法。初始化进程时,可以指定一个最大进程数(默认为CPU核数),当池中的进程数达到最大值时,该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

from multiprocessing import Pool
import os
import time
import random

# 进程类
def worker(msg):
    start_time = time.time()
    print(f"{msg}开始执行,执行进程id为{os.getpid()},父进程id为{os.getppid()}")
    time.sleep(random.random()*2)
    stop_time = time.time()
    print(msg, f"执行完毕,耗时{stop_time-start_time: .2f}秒。")

if __name__ == "__main__":
    print("----开始执行----")
    start = time.time()
    pool = Pool(3)
    for i in range(0, 10):
        pool.apply_async(worker, (i,))  # 异步非阻塞
    pool.close()  # 关闭进程池,关闭后不再就收新请求
    pool.join()  # 必须放在close语句后面
    stop = time.time()
    print(f"执行结束,总耗时{stop-start:.2f}")

# 结果
'''
----开始执行----
0开始执行,执行进程id为4640,父进程id为7116
1开始执行,执行进程id为8,父进程id为7116
2开始执行,执行进程id为7728,父进程id为7116
0 执行完毕,耗时 0.09秒。
3开始执行,执行进程id为4640,父进程id为7116
2 执行完毕,耗时 0.10秒。
4开始执行,执行进程id为7728,父进程id为7116
4 执行完毕,耗时 0.04秒。
5开始执行,执行进程id为7728,父进程id为7116
3 执行完毕,耗时 0.20秒。
6开始执行,执行进程id为4640,父进程id为7116
6 执行完毕,耗时 0.12秒。
7开始执行,执行进程id为4640,父进程id为7116
5 执行完毕,耗时 1.38秒。
8开始执行,执行进程id为7728,父进程id为7116
1 执行完毕,耗时 1.95秒。
9开始执行,执行进程id为8,父进程id为7116
8 执行完毕,耗时 0.48秒。
7 执行完毕,耗时 1.81秒。
9 执行完毕,耗时 1.58秒。
执行结束,总耗时4.63
'''

如果使用进程池pool创建进程的话,就需要使用Manager().Queue()

from multiprocessing import Manager, Pool
import os

# queue,实现多进程间的数据传递,其实是个消息队列
def write(q):
    print(f"--开始执行写进程 {os.getpid()}--")
    for value in ['A', 'B', 'C']:
        print(f"把 {value} 放入队列")
        q.put(value)

def read(q):
    print(f"--开始执行读进程 {os.getpid()}--")
    for i in range(q.qsize()):
        print(f"从队列读取 {q.get(True)}")

if __name__ == "__main__":
    print(f"--开始执行主进程 {os.getpid()}--")
    q = Manager().Queue()
    p = Pool()
    p.apply(write, args=(q,))  # apply 阻塞,上个进程结束才会执行下一个进程
    p.apply(read, args=(q,))
    p.close()  # 关闭pool 不再接收请求
    p.join()  # 主进程阻塞,等待子进程运行结束,必须在close以后
    print("--主进程结束--")

# 结果
'''
--开始执行主进程 6012--
--开始执行写进程 11524--
把 A 放入队列
把 B 放入队列
把 C 放入队列
--开始执行读进程 11524--
从队列读取 A
从队列读取 B
从队列读取 C
--主进程结束--
'''

当子进程不是自身,而是一个外部进程时,创建子进程后,还需要控制子进程的输入和输出。subprocess模块可以非常方便的启动子进程,然后可以通过communicate()方法输入。

python 中subprocess

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode("gbk"))
print('exit code:', p.returncode)

# 结果
'''
$ nslookup
默认服务器:  dnspai-public-dns.dnspai.com
Address:  101.226.4.6

> > 服务器:  dnspai-public-dns.dnspai.com
Address:  101.226.4.6

python.org	MX preference = 50, mail exchanger = mail.python.org
> 
exit code: 0
'''

本文地址:https://blog.csdn.net/holysll/article/details/107226353