欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

铁乐学python_Day38_多进程和multiprocess模块1

程序员文章站 2022-04-24 16:39:26
【进程】 运行中的程序就是一个进程。 所有的进程都是通过它的父进程来创建的。 因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。 多个进程可以实现并发效果,程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。 以之前所学的知识,并不能实现创建进程这个功能,所... ......

铁乐学python_Day38_多进程和multiprocess模块1

【进程】

运行中的程序就是一个进程。
所有的进程都是通过它的父进程来创建的。
因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。
多个进程可以实现并发效果,程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。
以之前所学的知识,并不能实现创建进程这个功能,所以就需要借助到python中强大的模块--multiprocess。

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。
之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。
由于提供的子模块非常多,大致可分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

multiprocess.process模块

process模块是一个创建进程的模块,借助这个模块,可以完成进程的创建。

Process([group [, target [, name [, args [, kwargs]]]]]),
由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,当只有一个参数时必须有逗号表明为元组。

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组
4 kwargs表示调用对象的字典
5 name为子进程的名称

方法介绍:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁。
p.is_alive():如果p仍然运行,返回True。
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。

属性介绍:
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,
当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置。
p.name:进程的名称。
p.pid:进程的pid。
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束。
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。
这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功。

在windows中使用process模块的注意事项:

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),
在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。
因此如果将process()直接写在文件中就会无限递归创建子进程报错。
所以必须把创建子进程的部分使用if name ==‘main’ 判断保护起来,import 的时候,就不会递归运行了。

附:
主进程默认会等待子进程执行完毕之后才结束;
主进程和子进程之间的代码是异步的;

为什么主进程要等待子进程结束?
因为回收子进程的资源靠的是父进程,如果在子进程还未结束前父进程就意外终止结束了,
那么和这个父进程有关的子进程会变成僵尸进程,资源没有被回收但又占用了。
所以要尽量避免产生僵尸进程。

开启一个进程是有时间开销的 :操作系统响应开启进程指令,给这个进程分配必要的资源等。

使用process模块创建进程

在一个python进程中开启子进程,start方法和并发效果。

#!/usr/bin/env python
# _*_ coding: utf-8 _*_

import time
from multiprocessing import Process

def func(name):
    print('hello', name)
    print('我是子进程')

if __name__ == '__main__':
    # 实例化一个子进程,执行func函数,传输参数
    p = Process(target=func, args=('tiele',))
    # 运行子进程对象
    p.start()
    time.sleep(1)
    print('执行主进程的内容了')

效果如下:
hello tiele
我是子进程
执行主进程的内容了

Join方法

join方法能够检测到子进程是否已经执行完了,阻塞直到子进程执行结束。

#!/usr/bin/env python
# _*_ coding: utf-8 _*_

import time
from multiprocessing import Process

def func(name):
    print('hello', name)
    time.sleep(3)
    print('我是子进程')

if __name__ == '__main__':
    # 实例化一个子进程,执行func函数,传输参数
    p = Process(target=func, args=('tiele',))
    # 运行子进程对象
    p.start()
    # 主线程等待p终止
    p.join()
    print('我是父进程')

没加join之前:(父进程不管子进程运行完没有直接跟着执行后面的代码)
我是父进程
hello tiele
我是子进程

加了join之后:(父进程必須等待子进程运行完后才接着运行后面的代码)
hello tiele
我是子进程
我是父进程

查看主进程和子进程的进程号
例:
import os
from multiprocessing import Process

def f(x):
    print('子进程id :', os.getpid(), '父进程id :', os.getppid())
    return 

if __name__ == '__main__':
    print('主进程id :', os.getpid())
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

主进程id : 2788
子进程id : 1140 父进程id : 2788
子进程id : 1348 父进程id : 2788
子进程id : 6536 父进程id : 2788
子进程id : 4896 父进程id : 2788
子进程id : 5680 父进程id : 2788

多个进程同时运行(注意,子进程的执行顺序不是根据启动顺序决定的)

例:(异步非阻塞)
import os
import time
from multiprocessing import Process

def f(x):
    print('子进程id :', os.getpid(), ' 进程 :', x)
    time.sleep(1)

if __name__ == '__main__':
    print('主进程id :', os.getpid())
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
    print(10*'*')

主进程id : 3068
**********
子进程id : 5148  进程 : 1
子进程id : 5960  进程 : 0
子进程id : 4420  进程 : 3
子进程id : 5856  进程 : 2
子进程id : 5464  进程 : 4

例:join-同步阻塞,完全按顺序且运行完一个子进程后再到下一个
import os
import time
from multiprocessing import Process

def f(i):
    print('子进程id :', os.getpid(), ' 进程 :', i)
    time.sleep(1)

if __name__ == '__main__':
    print('主进程id :', os.getpid())
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
        p.join()  # 在运行多进程的for里面加join,就从异步非阻塞变成了同步阻塞了
    print(10*'*')

主进程id : 5688
子进程id : 6728  进程 : 0
子进程id : 1432  进程 : 1
子进程id : 4748  进程 : 2
子进程id : 3488  进程 : 3
子进程id : 3116  进程 : 4
**********

例:join-异步阻塞,多个子进程同时运行,且所有子进程运行完后才执行主进程中join后面的代码

import os
import random
import time
from multiprocessing import Process

def f(i):
    print('子进程id :', os.getpid())
    # 随机睡眠1-5秒
    time.sleep(random.randint(1, 5))
    print(' 进程 :', i)

if __name__ == '__main__':
    print('主进程id :', os.getpid())
    p_list = [ ]
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()
        p_list.append(p)
    for l in p_list:
        print(l)
        l.join()
    print(10*'*')

主进程id : 5112
<Process(Process-1, started)>
子进程id : 5864
子进程id : 6820
子进程id : 5632
子进程id : 6612
子进程id : 2812
 进程 : 3
 进程 : 0
<Process(Process-2, started)>
 进程 : 1
 进程 : 4
<Process(Process-3, started)>
 进程 : 2
<Process(Process-4, stopped)>
<Process(Process-5, stopped)>
**********

继承Process类的形式开启进程
import os
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('子进程:', os.getpid())
        print('%s 正在超进化' % self.name)

if __name__ == '__main__':

    p1 = MyProcess('暴龙兽')
    p2 = MyProcess('天使兽')
    p3 = MyProcess('加鲁鲁')

    # start方法会自动调用类中的run方法
    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    print('主进程:', os.getpid())

子进程: 5844
暴龙兽 正在超进化
子进程: 4536
天使兽 正在超进化
子进程: 6016
加鲁鲁 正在超进化
主进程: 6968

子进程和父进程的数据隔离
例:进程之间的数据是隔离的。

from multiprocessing import Process
n = 100
def work():
    global n
    n = n - 1
    print('子进程内: ', n)

if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    p.join()
    print('主进程内: ', n)

子进程内:  99
主进程内:  100

守护进程
会随着主进程的代码执行完毕(注:代码执行完毕并不代表主进程结束!)而结束。

主进程创建守护进程
  其一:守护进程会在主进程代码执行完毕后就终止;
  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行完毕,守护进程随即终止。
因此,守护进程很适合用来做循环隔多长时间发送表示主进程还存活的信息这种。

import time
from multiprocessing import Process

def func():
    print('son start')
    while True:
        time.sleep(1)
        print(time.ctime(), 'sos')

def func2():
    print('start : in func2')
    time.sleep(6)
    print('end : in func2')

if __name__ == '__main__':
    p1 = Process(target=func)
    # 在一个进程开启之前可以设置它为一个守护进程
    p1.daemon = True
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    time.sleep(2)
    print('主进程代码执行完毕')


son start
start : in func2
Mon May 14 16:40:35 2018 sos
主进程代码执行完毕
end : in func2

分析:
    主进程的代码大概在2s多的时候就结束了.
    p2子进程在6s多的时候结束.
    p1是在什么时候结束的?
    p1做为守护进程是在主进程的代码执行完毕之后就结束了.从结果看只打印出一次sos信息也可以侧面验证。

    注:主进程会等待子进程的结束而结束,但守护进程守护的是主进程的代码执行完毕而跟着结束。

守护进程的意义:
    子进程会随着主进程代码的执行完毕而结束;
    注意:守护进程不会关心主进程什么时候结束,它只关心主进程中的代码什么时候执行完毕。

守护进程的作用:
    守护主进程,程序报信第三方主进程还活着;
    主进程开启的时候就建立一个守护进程,负责每隔N秒 就给检测程序发一条消息。

例:增加了子进程join方法后就可看出和上例明显不同了。
import time
from multiprocessing import Process

def func():
    print('son start')
    while True:
        time.sleep(1)
        print(time.ctime(), 'sos')

def func2():
    print('start : in func2')
    time.sleep(6)
    print('end : in func2')

if __name__ == '__main__':
    p1 = Process(target=func)
    # 在一个进程开启之前可以设置它为一个守护进程
    p1.daemon = True
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    time.sleep(2)
    p2.join()
    print('主进程代码执行完毕')

son start
start : in func2
Mon May 14 16:56:18 2018 sos
Mon May 14 16:56:19 2018 sos
Mon May 14 16:56:20 2018 sos
Mon May 14 16:56:21 2018 sos
Mon May 14 16:56:22 2018 sos
Mon May 14 16:56:23 2018 sos
end : in func2
主进程代码执行完毕

例:Socket 时间戳服务器使用Process做多进程(模拟qq聊天服务器(转消息)并发)
注:Process多个子进程里面不能使用input 这种会引起阻塞的I/O.

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import time
from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 9527))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:break
            print(msg.decode('utf-8'), client_addr)
            '''
            Process 多个子进程里不能使用input,i/o 阻塞会引起错误,
            而且聊天服务器里也只是对信息做转发处理而己,这里模拟转发会比较复杂.
            所以模拟的只是对收到的信息加时间戳.
            '''
            result = '[%s] %s' % (time.ctime(), msg.decode('utf-8'))
            conn.send(result.encode('utf-8'))
        except Exception:
            break

if __name__ == '__main__':
    # windows下start进程一定要写到这下面
    print('正准备接受连接...')
    while True:
        conn, client_addr = server.accept()
        p = Process(target=talk, args=(conn, client_addr))
        p.start()

服务端:
D:\PortableSoft\Python35\python.exe E:/Python/重要的代码/Socket_聊天并发/TcpScoketMS.py
正准备接受连接...
独立日 ('127.0.0.1', 51735)
地球反击战 ('127.0.0.1', 51734)

客户端1:
D:\PortableSoft\Python35\python.exe E:/Python/重要的代码/Socket_聊天并发/TcpScoketMC.py
>>: 地球反击战
[Mon May 14 17:32:44 2018] 地球反击战
>>: 

客户端2:
D:\PortableSoft\Python35\python.exe E:/Python/重要的代码/Socket_聊天并发/TcpScoketMC.py
>>: 独立日
[Mon May 14 17:32:36 2018] 独立日
>>: 

进程对象的其他方法:terminate,is_alive

#!/usr/bin/env python
# _*_ coding: utf-8 _*_

import time
import sys
import random
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('[%s] 超进化' % self.name)
        time.sleep(random.randint(1, 5))
        self.viewBar(100)
        print('[%s] 进化完毕!')

    def viewBar(self, i):
        """
        进度条效果
        """
        for count in range(0, i + 1):
            second = 0.1
            time.sleep(second)
            sys.stdout.write('\r 超进化 >>>:%.0f%%' % count)
        sys.stdout.flush()

if __name__ == '__main__':

    p1 = MyProcess('亚古兽')
    p1.start()
    # 睡眠10秒以便展现run中的方法,当然主进程睡眠10秒仍未能执行完子进程的
    time.sleep(10)

    # 关闭进程,但不会立即关闭,所以使用is_alive立刻查看的结果可能还是存活
    p1.terminate()
    print('\n 进程存活:', p1.is_alive())

    print('进化失败')
    time.sleep(0.1)
    print('进程存活:', p1.is_alive())

[亚古兽] 超进化
 超进化 >>>:67%
 进程存活: True
进化失败
进程存活: False

由上例可看到,子进程运行中还未运行完毕的时候父进程被terminate关闭了。

如果将上例中的p1.terminate()注释掉,也就是不手动执行关闭进程,子进程就能正常执行完毕。
另,字符串格式化中:\r表示不换首行 

[亚古兽] 超进化
 超进化 >>>:57%
 进程存活: True
 超进化 >>>:58%进程存活: True
 超进化 >>>:100%[%s] 进化完毕

注意:在上例,self.name 是自定义覆盖掉了父类默认就有的属性,表示的是子进程的名字。
在外部,子进程.name是用来查看子进程的名字的,所以self.name是有特殊意义的,
一般不要动它,可以使用其它变量名来替代name。
同样,子进程.pid也可以查看到子进程的id号。

end