并发编程之多线程
并发编程
- 并发(伪):由于执行速度特别快,人感觉不到
- 并行(真):创建10个人同时操作
线程
- 单进程,单线程的应用程序
- print('666')
- 到底什么是线程?什么是进程
- python自己没有这玩意,python中调用的操作系统的线程和进程(伪线程)
- 多线程
- 工作的最小单元
- 共享进程中所有资源
- 每个线程可以分担一点任务,最终完成最后的结果
python多线程原理:python的多线程实际是一个假的多线程(多个线程在1核cpu上运行,进行快速的切换导致错觉为同时执行)
代码
import threading def func(arg): print(arg) th = threading.thread(target=func) th.start() print('end')
一个应用程序:软件
- 默认一个程序只有一个进程
- 可以有多个进程(默认只有一个),一个进程可以创建多个线程(默认一个)。
python多线程情况下:
- 计算密集型操作:效率低(gil锁)
- io操作:效率高
python多进程情况下:
- 计算密集型操作:效率高(浪费空间)
- io操作:效率高(浪费资源)
以后写python时:
- io密集型用多线程:文件/输入输出/socket
- 计算密集型用多进程:
进程
- 独立开辟内存
- 进程之间的数据隔离
注意:进程是为了提供环境让线程工作
python中线程和进程(gil锁)
gil锁,全局解释器锁。用于限制一个进程中同一个时刻只有一个线程被cpu调度
扩展:默认gil锁在执行100个cpu指令后(过期时间)。
线程的使用
多线程基本使用
- 基础例子
import threading def func(arg): print(arg) th = threading.thread(target=func) th.start() print('end')
- 测试例子(主线程默认会先执行自己的代码,然后等子线程执行完毕后,才会结束)
import time import threading def func(arg): time.sleep(arg) print(arg) t1 = threading.thread(target=func,args=(3,)) t1.start() t2 = threading.thread(target=func,args=(9,)) t2.start() print('end')
- jojn方法:
import time import threading def func(arg): time.sleep(3) print(arg) print('开始执行t1') t1 = threading.thread(target=func,args=(3,)) t1.start() # 无参:让主线程等待,等到子线程t1执行完毕后,才能往下执行 # 有参:让主线程在这里最多等待n秒,无论执行完毕与否,会继续往下走 t1.join(2) print('开始执行t2') t2 = threading.thread(target=func,args=(9,)) t2.start() t2.join() # 让主线程等待,等到子线程t1执行完毕后,才能往下执行 print('end')
- 线程名称获取
import threading def func(arg): # 获取当前执行该函数的线程的对象 t = threading.current_thread() # 根据当前线程对象,获取当前线程名称 name = t.getname() print(name,arg) t1 = threading.thread(target=func,args=(3,)) t1.setname('mhy') t1.start() t2 = threading.thread(target=func,args=(9,)) t2.setname('zz') t2.start() print('end')
- 线程本质
# 先打印3还是end? import threading def func(arg): print(arg) t1 = threading.thread(target=func,args=(3,)) # start是开始线程嘛?不是 # start是告诉cpu,我已准备就绪,你可以调度我了。 t1.start() print('end')
- 面向对象式子多线程使用
import threading class mythread(threading.thread): def run(self): print(123,self._args,self._kwargs) t1 = mythread(args=(11,)) t1.start() t2 = mythread(args=(12,)) t2.start()
- 计算密集型多线程(作用不大)
import threading def func(lis,num): et = [i+num for i in lis] print(et) t1 = threading.thread(target=func,args=([11,22,33],1)) t1.start() t2 = threading.thread(target=func,args=([44,55,66],100)) t2.start()
- 实例代码
import threading import time #定义类继承thread线程 class codingthread(threading.thread): def run(self): for x in range(3): print('正在写代码%s' % threading.current_thread()) time.sleep(1) class drawingthread(threading.thread): def run(self): for x in range(3): print('正在画图%s' % threading.current_thread()) time.sleep(1) def main(): t1 = codingthread() t2 = drawingthread() t1.start() t2.start() if __name__ == '__main__': main()
线程安全(lock)
- 线程安全,多线程操作时,内部会让所有线程排队处理,如:list/dict/queue
- 线程不安全 + 人(lock) = 排队处理
简介:
python自带的解释器是cpython。cpython解释器的多线程实际上是一个假的多线程(在多核cpu中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在cpython解释器中有一个东西叫做gil(global intepreter lock),叫做全局解释器锁。这个解释器锁是有必要的。因为cpython解释器的内存管理不是线程安全的。当然除了cpython解释器,还有其他的解释器,有些解释器是没有gil锁的,见下面:
- jython:用java实现的python解释器。不存在gil锁。
- ironpython:用.net实现的python解释器。不存在gil锁。
- pypy:用python实现的python解释器。存在gil锁。
gil虽然是一个假的多线程。但是在处理一些io操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在io操作上建议使用多线程提高效率。在一些cpu计算操作上不建议使用多线程,而建议使用多进程。
不安全例子
import time import threading lit = [] def func(arg): lit.append(arg) time.sleep(0.04) m = lit[-1] print(arg,m) # m和arg应该是一个值 for num in range(10): t1 = threading.thread(target=func,args=(num,)) t1.start()
锁机制原理
多线程同时执行,会导致数据同时执行,获取数据不符合结果。
线程先获得执行权时,将会将执行过程锁起来,其他线程不能使用,必须要等该线程执行完,其他线程才可获取执行权,执行线程,将解决数据不统一的方法
lock 锁(一次放行一个)
import time import threading lit = [] lock = threading.lock() # 创建一个锁 def func(arg): lock.acquire() # 将该行代码以下代码锁起来,直到遇到release释放 lit.append(arg) time.sleep(0.04) m = lit[-1] print(arg,m) lock.release() # 释放锁 for num in range(10): t1 = threading.thread(target=func,args=(num,)) t1.start()
rlock 递归锁(一次放行多个),
因为lock锁如果有多层锁机制,会造成死锁
现象,所以有了rlock
(递归锁)
代码如下:
import time import threading lit = [] lock = threading.rlock() # 创建一个递归锁 def func(arg): # lock只能解开一个锁,会造成死锁线程 # rlock可以解开两个锁,解决了下面问题 lock.acquire() # 将该行代码以下代码锁起来,直到遇到release释放 lock.acquire() lit.append(arg) time.sleep(0.04) m = lit[-1] print(arg,m) lock.release() # 释放锁 lock.release() # 释放锁 for num in range(10): t1 = threading.thread(target=func,args=(num,)) t1.start()
boundedsemaphore(一次放n个)信号量
import time import threading # 创建一个锁,这个可以支持你同时进行几次锁,默认为1次 lock = threading.boundedsemaphore(3) def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for num in range(20): t = threading.thread(target=func,args=(num,)) t.start()
condition(1次放x个数)动态输入
lock版本的生产者与消费者模式可以正常的运行,但是存在一个不足,在消费者中,总是通过while true死循环并且上锁的方式去判断钱够不够,上锁是一个很耗费cpu资源的行为,因此这种方式不是最好的,还有一种更好的方式便是使用threading.condition来实现,threading.condition可以在没有数据的时候处于堵塞等待状态,一旦有合适的数据了,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样可以不用做一些无用的上锁和解锁的操作。可以提高程序的性能。首先对threading.condition相关的函数做个介绍,threading.condition类似threading.lock,可以在修改全局数据的时候进行上锁,也可以在修改完毕后进行解锁。以下将一些常用的函数做个简单的介绍:
- acquire:上锁
- release:解锁
- wait:将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify和notify_all函数唤醒。被唤醒后继续等待上锁,上锁后执行下面的代码。
- notify:通知某个等待的线程,默认是第1个等待的线程。
- notify_all:通知正在等待的线程。notify和notify_all不会释放锁。并且需要在release之前调用。
# 方法一 import time import threading # 创建一个锁,这个可以支持你同时进行几次锁,默认为1次 lock = threading.condition() def func(arg): print('线程进来了') lock.acquire() lock.wait() print(arg) time.sleep(1) lock.release() for num in range(3): t = threading.thread(target=func,args=(num,)) t.start() while true: num = int(input('>>>:')) lock.acquire() lock.notify(num) lock.release() # 方法二 import time import threading # 创建一个锁,这个可以支持你同时进行几次锁,默认为1次 lock = threading.condition() def xxx(): print('来执行函数了') input('>>>:') return true def func(arg): print('线程进来了') lock.wait_for(xxx) print(arg) time.sleep(1) for num in range(3): t = threading.thread(target=func,args=(num,)) t.start()
event(事件)1次放所有
import threading # 创建一个锁,这个可以支持你同时进行几次锁,默认为1次 lock = threading.event() def func(arg): print('线程进来了') lock.wait() # 变红灯 print(arg) for num in range(10): t = threading.thread(target=func,args=(num,)) t.start() input('>>>') lock.set() # 变绿灯 input('>>>') # 重新回归 红灯状态 lock.clear() for num in range(10): t = threading.thread(target=func,args=(num,)) t.start() input('>>>') lock.set() # 变绿灯 input('>>>')
线程总结
线程安全:列表和字典就是线程安全;
为什么要加锁?
- 非线程安全
- 控制一段代码的时候,每次只能最多同时执行几个
threading.local
为每一个线程创建一个字典键值对,为数据进行隔离
示例代码:
import time import threading pond = threading.local() def func(arg): # 内部会为当前线程创建一个空间用于存储,phone = 自己的值 ,将数据隔离开 pond.phone = arg time.sleep(2) # 取当前线程自己空间取值 print(pond.phone,arg) for num in range(10): t = threading.thread(target=func,args=(num,)) t.start()
线程池
使用concurrent
来创建线程池,使用线程池可以有效的解决线程无止境的问题,用户每一次一个请求都会创建一个线程,这样线程一堆积也会造成程序缓慢,所以,线程池就可以帮我们解决这个问题,线程池可以设定,最多同时执行n个线程,由自己设定。
示例代码:
import time from concurrent.futures import threadpoolexecutor # 创建一个线程池,(最多同时执行5个线程) pool = threadpoolexecutor(5) def func(arg1,arg2): time.sleep(1) print(arg1,arg2) for num in range(5): # 去线程池申请一个线程,让线程执行func函数 pool.submit(func,num,8)
queue线程安全队列:
在线程中,访问一些全局变量,加锁是一个经常的过程。如果你是想把一些数据存储到某个队列中,那么python内置了一个线程安全的模块叫做queue模块。python中的queue模块中提供了同步的、线程安全的队列类,包括fifo(先进先出)队列queue,lifo(后入先出)队列lifoqueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。相关的函数如下:
- 初始化queue(maxsize):创建一个先进先出的队列。
- qsize():返回队列的大小。
- empty():判断队列是否为空。
- full():判断队列是否满了。
- get():从队列中取最后一个数据。
- put():将一个数据放到队列中。
示例代码
#encoding:utf-8 from queue import queue import threading import time # q = queue(4) # 添加4个队列 # q.put(10) #第1个队列插入一条数据 # q.put(4) #第2个队列插入一条数据 # for x in range(4): # q.put(x) # # for x in range(4): # print(q.get()) # print(q.empty()) # print(q.full()) # print(q.qsize()) def set_value(q): index = 0 while true: q.put(index) index +=1 time.sleep(2) def get_value(q): while true: print(q.get()) def main(): q = queue(4) t1 = threading.thread(target=set_value,args=[q]) t2 = threading.thread(target=get_value,args=[q]) t1.start() t2.start() if __name__ == '__main__': main()
生产者消费者模型
模型三部件
- 生产者
- 队列:先进先出
- 栈:后进先出
- 消费者
- 队列
生产者和消费者模型解决了 不用一直等待的问题
使用queue创建队列
示例代码:
import time import threading from queue import queue q = queue() def producer(id): '''生产者''' while true: time.sleep(2) q.put('包子') print('厨师 %s 生产了一个包子'%id) def consumer(id): '''消费者''' while true: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子'%id) for produce in range(1,4): t1 = threading.thread(target=producer,args=(produce,)) t1.start() for consu in range(1,3): t2 = threading.thread(target=consumer,args=(consu,)) t2.start()
总结:
- 操作系统帮助开发者操作硬件
- 程序员写好代码在操作系统上运行(依赖解释器)
- 任务特别多。以前一个一个执行,(串行),现在可以使用多线程
为什么创建线程?
- 由于线程是cpu工作的最小单元,创建线程可以利用多核优势实现并行操作(java,c#)
为啥创建进程?
- 进程和进程之间做数据隔离(java/c)
python
- python中存在一个gil锁。
- 造成:多线程无法利用多核优势
- 解决:开多进程处理(浪费资源)
- 总结:
- io密集型:多线程
- 计算密集型:多进程
- 线程的创建
- thread
- 面向对象继承(threading.thread)
- 其他
- jojn
- setdeanon
- setname
- threading.current_thread()
- 锁
- 获得
- 释放