欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python 使用multiprocessing 特别耗内存

程序员文章站 2022-05-19 18:03:52
什么是多线程/多进程 引用虫师的解释: 计算机程序只不过是磁盘中可执行的,二进制(或其它类型)的数据。它们只有在被读取到内存中,被操作系统调用的时候才开始它们的生命期。 进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操作系统管 ......

什么是多线程/多进程

引用的解释:

计算机程序只不过是磁盘中可执行的,二进制(或其它类型)的数据。它们只有在被读取到内存中,被操作系统调用的时候才开始它们的生命期。

进程(有时被称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间。

线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境。我们可以想像成是在主进程或“主线程”中并行运行的“迷你进程”。

 

为什么需要多线程/多进程

我们直接编写的爬虫程序是单线程的,在数据需求量不大时它能够满足我们的需求。

但如果数据量很大,比如要通过访问数百数千个url去爬取数据,单线程必须等待当前url访问完毕并且数据提取保存完成后才可以对下一个url进行操作,一次只能对一个url进行操作;

我们使用多线程/多进程的话,就可以实现对多个url同时进行操作。这样就能大大缩减了爬虫运行时间。

 

实现多线程/多进程

多线程

python提供了两组多线程接口,一是thread模块_thread,提供低等级接口;二是threading模块,在thread模块基础上进行封装,提供更容易使用的基于对象的接口,可以继承thread对象来实现多线程。

同时,还有其他线程相关的对象,如timer、lock等。

在这里,我们使用threading模块实现多线程。

1. 添加线程

threading.thread(target, args)

使用threading.thread()新建一个线程,target是需要执行的函数,args是需要传入该函数的参数,args接受一个tuple,即使只有一个参数也需要写成(x,)形式

import threading

print(threading.active_count()) # 显示当前激活的线程数
print(threading.enumerate()) # 显示当前激活的线程
print(threading.current_thread()) # 当前运行的线程 


def thread_job():
    print('this is a thread of %s' % threading.current_thread())

def main():
    thread = threading.thread(target=thread_job,) # 添加一个线程
    thread.start() # 开始该线程

if __name__ == '__main__':
    main()

2. 线程阻塞:join

join()的作用是调用该线程时,等待该线程完成后再继续往下运行。

join通常用于主线程与子线程之间,主线程等待子线程运行完毕后再继续执行,避免子程序和主程序同时运行,子程序还没有运行完的时候主程序就已经运行结束。

import threading
import time

# 定义一个fun,传入线程
def t1_job():
    print('t1 start\n')
    for i in range(10):
        time.sleep(0.1)
    print('t1 finish\n')
        
def t2_job():
    print('t2 start\n')
    print('t2 finish\n')
        
def main():
    thread1 = threading.thread(target=t1_job, name='t1') # 添加线程,准备执行thread_job,命名t1
    thread2 = threading.thread(target=t2_job, name='t2')
    
    thread1.start() # 执行该线程,没有添加join的时候,同步执行main和thread_job
    thread2.start()
    
    thread1.join() # 等待thread1完成后才进行下一步-主程序
    thread2.join() # 等待thread2完成后才进行下一步-主程序
    print('all done')

if __name__ == '__main__':
    main()

3. 信息传递:queue队列

queue是python标准库中的线程安全的队列(fifo)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列。

queue是一种先进先出的数据结构,一般来说读数据都从queue头读,写数据都从queue尾写入。

import threading
from queue import queue

def job(l, q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l) # 线程中,return获取的值无法提取,需要放入q中

def multithreading():
    q = queue() # 队列
    threads = [] # 全部线程
    data = [[1, 2, 3], [3, 4, 5], [4,4,4], [5,5,5]]
    for i in range(4):
        # 4个线程来执行job函数
        t = threading.thread(target=job, args=(data[i], q))
        t.start()
        threads.append(t) # 当前线程加入全部线程中
        
    # 对主线程中的每一个线程都执行join()
    for thread in threads:
        thread.join()
   
    results = [] # 保存结果
    for _ in range(4):
        results.append(q.get()) # 从q中拿出值,每次只能按顺序拿出一个值
    print(results)
    
if __name__ == '__main__':
    multithreading()

# [[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

4. 线程锁:lock

lock在不同线程使用同一共享内存时,能够确保线程之间互不影响。

使用lock的方法是:在每个线程执行运算修改共享内存之前执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问;

执行运算完毕后使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。

lock.acquire()和lock.release()必须成对出现

# lock锁,当前线程运行完成后才进行下一进程
import threading

def job1():
    global a, lock
    lock.acquire() # 打开锁
    for i in range(10):
        a += 1
        time.sleep(0.2)
        print('job1', a)
    lock.release() # 关闭锁
    
def job2():
    global a, lock
    lock.acquire() # 打开锁
    for i in range(10):
        a += 10
        time.sleep(0.2)
        print('job2', a)
    lock.release() # 关闭锁
    
if __name__ == '__main__':
    lock = threading.lock() # lock锁
    a = 0
    t1 = threading.thread(target=job1)
    t2 = threading.thread(target=job2)
    t1.start()
    t2.start()

将上述代码中的lock.acquire()和lock.release()四行代码注释后运行,就是不加锁的情况,这时候输出结果都是混乱的。而加锁后,输出结果正常。

5. 线程池

线程池有几种方法可以实现,这里我们使用multiprocessing.dummy库。

from multiprocessing.dummy import pool as threadpool # 线程池
import threading

def job(i):
    print(i, '\n', threading.current_thread())
    
if __name__ == '__main__':
    pool = threadpool(4) # 创建一个包含4个线程的线程池
    pool.map(job, range(12))
    pool.close() # 关闭线程池的写入
    pool.join() # 阻塞,保证子线程运行完毕后再继续主进程 

 

多进程

多进程multiprocessing和多线程threading类似,都是用在python中进行并行计算的,而多进程则是为了弥补python在多线程中的劣势而出现的。

multiprocessing是使用计算机的多核进行运算,它可以避免多线程中gil的影响。

python使用multiprocessing模块实现多进程,用法和threading基本一致。

1. 添加进程

multiprocessing.process(target, args)

使用multiprocessing.process新建一个进程,target是需要执行的函数,args是需要传入该函数的参数,args接受一个tuple,即使只有一个参数也需要写成(x,)形式

import multiprocessing as mp

def job(a,d):
    print('aaaaa')

if __name__=='__main__':
    p1 = mp.process(target=job,args=(1,2)) # 添加一个进程
    p1.start()
    p1.join()

2. 信息传递:queue队列

多进程中的queue使用同多线程一致,同样为先进先出

多进程可以直接从multiprocessing.queue()导入queue队列。

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    # 将值放入队列

if __name__=='__main__':
    q = mp.queue() # queue队列
    p1 = mp.process(target=job,args=(q,))
    p2 = mp.process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get() # 从队列中取出值
    res2 = q.get() # 从队列中取出值
    print(res1, res2)

3. 进程池

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.pool() # 定义一个进程池
    res = pool.map(job, range(100))
    print(res)

if __name__=='__main__':
    multicore()

 关于进程池的更多信息请跳转至:

 4. 共享内存

一般的变量在进程之间是没法进行通讯的,multiprocessing 给我们提供了 value 和 array 模块,他们可以在不通的进程*同使用。

value()和array()都接受两个参数,第一个为数据类型,第二个是传入的数。

value()可以接受传入单个数值,array()可以接受传入一个一维数组

import multiprocessing as mp

value1 = mp.value('i', 0) # value接受单个数值,i表示一个带符号的整型
array = mp.array('i', [1, 2, 3, 4]) # array接受一个一维数组

array2 = mp.array('i', [[1,2], [2,3]]) # 传入一个二维数组,错误,传入参数非一维数组

数据类型如下:

type code
c type python type minimum size in bytes
'b' signed char int 1
'b' unsigned char int 1
'u' py_unicode unicode character 2
'h' signed short int 2
'h' unsigned short int 2
'i' signed int int 2
'i' unsigned int int 2
'l' signed long int 4
'l' unsigned long int 4
'q' signed long long int 8
'q' unsigned long long int 8
'f' float float 4
'd'
double
float 
8

5. 进程锁

进程锁同线程锁使用方法一致,lock在不同进程使用同一共享内存时,能够确保进程之间互不影响。

使用lock的方法是:在每个进程执行运算修改共享内存之前执行lock.acquire()将共享内存上锁, 确保当前进程执行时,内存不会被其他进程访问;

执行运算完毕后使用lock.release()将锁打开, 保证其他的进程可以使用该共享内存。

lock.acquire()和lock.release()必须成对出现。 

import multiprocessing as mp

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.lock() # 定义一个进程锁
    v = mp.value('i', 0) # 定义共享内存
    p1 = mp.process(target=job, args=(v,1,l)) # 需要将lock传入
    p2 = mp.process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

 

如何选择多线程/多进程

1. 结论

cpu密集型代码(各种循环处理、计算等等):使用多进程

io密集型代码(文件处理、网络爬虫等):使用多线程

2. 解释

多线程和多进程的理解可以类比于公路。

假设当前公路均为单行道,并且出于安全考虑,一个车道只能同时行驶一辆汽车,一条公路只有一名驾驶员。只有一名指挥者进行集中调度,驾驶员获取到了指挥者的调度信息才会驾驶。

单线程是只有一条公路而且是单车道,只能同时行驶一辆汽车;

多线程是只有一条公路,但是是多车道,可以同时行驶多辆汽车;

多进程是有很多条公路,每条公路可能是单车道也可能是多车道,同样可以同时行驶多辆汽车。

 

因为gil的存在,python中的多线程其实在同一时间只能运行一个线程,就像一名驾驶员只能同时驾驶一辆汽车。四线程类比于一条四车道的公路,但是驾驶员可以从驾驶车道a上的汽车切换至驾驶车道b上的汽车,驾驶员切换的速度够快的话,看起来就像是这条公路上的四辆汽车都在同时行驶。指挥者发布的命令只需要跨越车道就能传递给驾驶员,命令传输的时间损耗相对较小。所以对于多线程,我们希望指挥者可以比较频繁发布命令,驾驶员获取到命令后能够很快就完成然后切换到下一个车道继续执行命令,这样看起来就像是驾驶员同时驾驶四辆汽车了。所以对于io密集型代码,推荐使用多线程。

而对于多进程来说,每条公路都有一名驾驶员,四线程类比于四条公路,则四名驾驶员可以同时驾驶四辆汽车。但指挥者发布的命令需要跨越公路才能传递给驾驶员,命令传输的时间损耗相对较大。所以对于多进程,我们希望指挥者发布一次命令后驾驶员可以执行较长时间,这样就不必把时间过多花费在信息传输上。所以对于cpu密集型代码,推荐使用多进程。 

 

参考资料

1. 

2. python队列queue

3. python多线程(2)——线程同步机制

4. 莫烦python-threading多线程

5. python 多进程锁 多进程共享内存

6. python学习笔记——多进程*享内存value & array

7. 莫烦python-multiprocessing多进程

8. python 之 多进程

9. python多进程 

10. python 使用multiprocessing 特别耗内存

11.