Python 进阶学习笔记之六:多线程编程
Python 进阶系列笔记第四篇,前置文章链接:
Python 进阶学习笔记之一:内置常用类型及方法
Python 进阶学习笔记之二:常用数据类型(上)
Python 进阶学习笔记之三:常用数据类型(下)
Python 进阶学习笔记之四:高效迭代器工具
Python 进阶学习笔记之五:异步 IO
1. 多线程编程 - threading
在多核服务器上,多线程充分利用多核特性提升效率是很普遍的做法,threading 模块就提供了支持多线程编码的高级 API。threading 模块是在较低级的模块 _thread 基础上建立较高级的线程接口,推荐在 3.7 版本后使用本模块构建普通多线程程序。如果你想让你的应用更好的利用多核计算机的计算性能,推荐你使用 multiprocessing
模块或者 concurrent.futures.ProcessPoolExecutor
类。但是如果你想同时运行多个I/O绑定任务,线程仍然是一个合适的模型。
先代码示例最基本用法:
import threading
import time
local_param = threading.local() # 线程 local 变量是一种能存储当前线程变量的一种容器类型,专门用于多线程变量存储
def do_somthing():
local_param.name = threading.current_thread().getName() # 当前代码示例中,可以使用此方法获取当前线程的名称
print(f"{local_param.name} is doing something...")
time.sleep(1)
print("over")
mt = threading.Thread(target=do_somthing, name="n1") # 一种线程实例话方法
mt2 = threading.Thread(target=do_somthing, name="n2")
mt.start()
mt2.start()
# 输出
n1 is doing something...
n2 is doing something...
over
over
# 用法2
local_param = threading.local()
class MyThread(threading.Thread):
def run(self):
name = self.name # 继承 Thread 的方式实现一个线程,可以直接取线程名称
local_param.name = name
print(f"{local_param.name} is doing something...")
time.sleep(1)
print(f"{name} is over")
t1 = MyThread()
t2 = MyThread()
t1.setName("t1")
t2.setName("t2")
t1.start()
t2.start()
# 输出
n1 is doing something...
n2 is doing something...
over
over
上面示例中就启动了两个线程,具体业务可以拿上面两个写法进行扩展了。下面具体讲解一下原生 线程的一些同步操作工具和语法。
查看模块 threading 的源码文件,查看模块注释可以知道,该模块的设计基于 Java 的线程模型,Python 的 Thread 类只是 Java 的 Thread 类的一个子集;目前还没有优先级,没有线程组,线程还不能被销毁、停止、暂停、恢复或中断。
1.1 线程本地数据
线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个 local (或者一个子类型)的实例并在实例中储存属性:
mydata = threading.local()
mydata.x = 1
在不同的线程中,实例的值会不同,上面的简例中已经有了基本应用。
1.2 线程对象
线程对象通过 threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
进行实例或者继承它在进行实例,实例后需要调用 start()
方法来启动线程,直到线程内部的 run()
方法执行完成,它都处于 ‘活跃’ 状态,可以用 is_alive()
方法用于检查线程是否存活。Python 3.3 新加入的 daemon
参数,用来标记一个线程是否是守护线程。
1.3 上下文管理
上下文管理,也就是 with 语句,在这个模块提供的带有 acquire() 和 release() 方法的对象,都可以被用作 with 语句的上下文管理器,当进入语句块时 acquire() 方法会被调用,退出语句块时 release() 会被调用。因此,以下片段:
with some_lock:
# do something...
相当于:
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
Python 的上下文管理特性,在很多场景下使用,最常用的就是文件的打开操作,而且我们自定义的类也可以实现上下文管理,不过这不是这篇的重点,有兴趣的同学参阅专门介绍的文章。下面关于同步组件的介绍,都支持上下文管理,默认都按此特性编写示例代码。
1.4 锁对象 - Lock
如果读过本系列的异步IO章节,应该还记得在异步编程中的各种同步原语,在多线程中,这些同步组件的概念依然适用,只是使用方式略有不同。
threading.Lock:获得一个锁对象,一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放。一个 Lock 对象只有两种状态:‘锁定’ 与 ‘非锁定’,一旦被第一次锁定,在释放之前,任何线程都不能再次获取锁。一个线程中嵌套获取锁操作会造成死锁,而且Python的解释器还不一定能检查出来,一定要注意。
import threading
import time
lock = threading.Lock()
class MyThread(threading.Thread):
def run(self):
name = threading.current_thread().getName()
print(f"{name} 等待锁")
with lock:
print(f"{name} 获取锁")
time.sleep(1)
print(f"{name} 释放锁")
t1 = MyThread()
t2 = MyThread()
t1.setName("t1")
t2.setName("t2")
t1.start()
t2.start()
# 输出
t1 等待锁
t1 获取锁
t2 等待锁
t1 释放锁
t2 获取锁
t2 释放锁
1.5 可重入锁 - RLock
重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 “所属线程” 和 “递归等级” 的概念。和上面的 Lock 不同,可重入锁就是为一个线程内部嵌套使用设计的,其使用方法如下:
import threading
import time
lock = threading.RLock() # 这里如果是 Lock 对象,程序会产生死锁
class MyThread(threading.Thread):
def run(self):
name = threading.current_thread().getName()
print(f"{name} 等待锁")
with lock:
print(f"{name} 获取锁")
time.sleep(1)
self.__pm()
print(f"{name} 释放锁")
def __pm(self):
name = threading.current_thread().getName()
with lock:
print(f'{name}再次获取一次锁')
t1 = MyThread()
t2 = MyThread()
t1.setName("t1")
t2.setName("t2")
t1.start()
t2.start()
# 输出
t1 等待锁
t1 获取锁
t2 等待锁
t1再次获取一次锁
t1 释放锁
t2 获取锁
t2再次获取一次锁
t2 释放锁
1.6 条件对象 - Condition
threading.Condition(lock=None)
:条件对象,允许一个或多个线程在被其它线程所通知之前进行等待。其参数 lock 若不为空,必须是上面 Lock 或 RLock 的一种,默认内部会使用 RLock。使用场景上,上面的锁机制一般用于相通逻辑对同一个资源的竞争上面,而Condition 一般用于解决类似 “生产者-消费者” 问题模型上。下面让我看一个简单的例子,一个生产者,一个消费者的情况:
import threading
import time
condition = threading.Condition()
item_available = False
def consumer():
global item_available
while True:
with condition:
while not condition_flag():
condition.wait()
print("消费者处理事件")
item_available = False
condition.notify()
print("消费者处理事件完成,唤醒生产者")
time.sleep(2)
def producer():
global item_available
while True:
with condition:
while condition_flag():
condition.wait()
print("生产者处理事件")
item_available = True
condition.notify()
print("生产者处理完成,唤醒消费者")
time.sleep(2)
def condition_flag():
return item_available
c = threading.Thread(target=consumer)
p = threading.Thread(target=producer)
c.start()
p.start()
# 输出
生产者处理事件
生产者处理完成,唤醒消费者
消费者处理事件
消费者处理事件完成,唤醒生产者
生产者处理事件
生产者处理完成,唤醒消费者
消费者处理事件
消费者处理事件完成,唤醒生产者
......
如果是多个消费者,多个生产者场景下,对变量 item_available
要进行同步操作限制,否则可能会出现意想不到的逻辑错误。
1.7 信号量对象 - Semaphore
信号量对象算是计算机科学史上最古老的同步原语之一,一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。
信号量有两个可用的实现类,一个是普通的:class threading.Semaphore(value=1)
;一个是有界信号量:class threading.BoundedSemaphore(value=1)
;有界信号量通过检查以确保它当前的值不会超过初始值,使用有界信号量能减少这种编程错误:信号量的释放次数多于其请求次数。
import threading
import time
max_conn = threading.BoundedSemaphore(2)
class Business(threading.Thread):
def run(self):
print(f"{self.name} 等待信号量")
with max_conn:
print(f"{self.name} 获取信号量")
time.sleep(2)
print(f"{self.name} 释放信号量")
t1 = Business(name='t1')
t2 = Business(name='t2')
t3 = Business(name='t3')
t4 = Business(name='t4')
t1.start()
t2.start()
t3.start()
t4.start()
# 输出
t1 等待信号量
t1 获取信号量
t2 等待信号量
t3 等待信号量
t2 获取信号量
t4 等待信号量
t1 释放信号量
t3 获取信号量
t2 释放信号量
t4 获取信号量
t3 释放信号量
t4 释放信号量
1.8 事件对象 - Event
事件对象是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。
很简单,没什么可说的,上代码:
import threading
import time
from datetime import datetime
event = threading.Event()
class Ping(threading.Thread):
def run(self):
print(f"{self.name} 开始等待 - {datetime.now().strftime('%H:%m:%S')}")
event.wait() # 方法签名是 wait(timeout=None),可以设置等待超时时间
print(f"{self.name} 结束等待 - {datetime.now().strftime('%H:%m:%S')}")
class Pang(threading.Thread):
def run(self):
time.sleep(3)
print(f"{self.name} 触发事件")
event.set()
ping = Ping(name='ping')
pang = Pang(name='pang')
ping.start()
pang.start()
# 输出
ping 开始等待 - 17:09:43
pang 触发事件
ping 结束等待 - 17:09:46
1.9 定时器对象 - Timer
定时器对象 Timer,参照 Java 的 Timer 实现,但功能上相对于Java还是差的不少,Java 的 Timer 可以延迟执行,定期执行,循环延迟执行等,而 threading.Timer 充其量是个延迟执行器,和 Javascript 中的 setTimeout 定时方法用法差不多。
def hello():
print("hello, world")
t = Timer(30.0, hello)
t.start() # 30秒后 "hello, world" 会被打印出来
1.10 栅栏对象 - Barrier
栅栏对象是在 3.2 版本新增的功能,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。其大概应用场景类似于一个操场上多个跑道中多个运动员,等指定人数的运动员全部聚集到起跑线后,比赛才能开始,而栅栏就是这一起跑线。
最常见的程序场景就是多个线程之间的执行同步,在某个逻辑点,运行快的要等一下运行慢的,等所有进程都到达这个逻辑点,在一起向下执行。
import threading
import time
barrier = threading.Barrier(3)
def action(delay_time=0):
"""
循环5此后退出
"""
name = threading.current_thread().getName()
loop = 3
while loop > 0:
print("{0} 循环第 {1} 次".format(name, 4-loop))
loop -= 1
time.sleep(delay_time)
barrier.wait()
print("{0} 循环结束".format(name))
t1 = threading.Thread(target=action, name='t1', args=(1,)) # 给线程的传参方式之一:给 delay_time 赋值 1
t2 = threading.Thread(target=action, name='t2', kwargs={'delay_time': 2}) # 给线程的传参方式之二:给 delay_time 赋值 2
t3 = threading.Thread(target=action, name='t2', kwargs={'delay_time': 3})
t1.start()
t2.start()
t3.start()
程序允许输出会看到,允许快的会在 loop 指定的循环次数完成后阻塞主,知道所有线程都循环完成了 loop 次,才会一起打印 循环结束
。
1.11 线程池
如果要使用多线程编程,就要知道对操作系统来说,创建一个线程的代价是相当大,而且操作系统中同时存在的线程数量是有上限的,这个上限也是非常的小,因此当我们的实现逻辑需要频繁创建线程的,就要考虑使用线程池了。
线程池是一个管理线程的容器,在 Python 的实现中,它被设计成为一个有界管理器,其内部是依靠一个 queue.SimpleQueue
来缓存提交的任务,一个set
来管理正在运行的线程。
注:在 Java 中可以有*的线程池,但一般都会设计一种阻塞队列来缓存提交过来的任务,不会无限制的创建线程,Python 中实现的线程池相对于 Java 来说是阉割版的。
class ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
:线程池声明描述:
- max_workers: 最大同时运行线程数
- thread_name_prefix: 线程名称前缀,3.6 版本新增
- initializer: 每个工作者线程开始处调用的一个可选可调用对象,3.7 版本新增
- initargs: 传递给初始化器的元组参数,3.7 版本新增
from concurrent.futures import thread
import time
import threading
def action(action_name):
name = threading.current_thread().getName()
print(f"{name} 执行完成 {action_name}")
def manger():
with thread.ThreadPoolExecutor(max_workers=3, thread_name_prefix='work') as executor:
for n in range(1, 11):
executor.submit(action, f"action-{n}")
time.sleep(0.5)
if __name__ == '__main__':
manger()
# 输出
work_0 执行完成 action-1
work_0 执行完成 action-2
work_0 执行完成 action-3
work_2 执行完成 action-4
work_0 执行完成 action-5
work_2 执行完成 action-6
work_0 执行完成 action-7
work_2 执行完成 action-8
work_0 执行完成 action-9
work_2 执行完成 action-10
从输出来看,可以看出自始至终只有 3 个线程运行提交的 action。
1.12 事件调度器 - sched
严格来说 sched 提供的不是一种多线程程序,它只是一种根据时间和优先级排序执行的一个调度器。它提供了一个类 class sched.scheduler(timefunc=time.monotonic, delayfunc=time.sleep)
来管理多个任务的执行顺序,其两个参数大部分场景下使用其默认值即可。
在 sched.scheduler 实例中最常用下面几个方法:
-
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
:在调度器运行之前向调度器添加一个事件。time 值是一个能够靠上面那个timefunc
计算出来的数值,如果使用默认的 time.monotonic 方法,这个time的单位是秒。 -
scheduler.enter(delay, priority, action, argument=(), kwargs={})
:调度器运行之前向调度器添加一个事件,delay 是一个延迟时间值,单位同enterabs
方法的 time 参数一样,enter 实现也是调用了enterabs
方法。 -
scheduler.cancel(event)
:终止某个已经加入的事件,如果事件不存在或者已经执行完,会触发 ValueError。 -
scheduler.run(blocking=True)
:启动调度器。
import sched
scheduler = sched.scheduler()
def action(name):
print(f"{name} is running")
if __name__ == '__main__':
scheduler.enter(3, 1, action=action, kwargs={"name": 'worker2'})
scheduler.enter(2, 1, action=action, argument=('worker1',))
scheduler.run()
# 输出
worker1 is running
worker2 is running
1.13 同步队列 - queue
在多个线程之间交换数据场景中,同步队列是很有用的工具。Python 的 queue 模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序。在 FIFO 队列中,先添加的任务先取回。在 LIFO 队列中,最近被添加的条目先取回(操作类似一个堆栈)。优先级队列中,条目将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回。在内部,这三个类型的队列使用锁来临时阻塞竞争线程;然而,它们并未被设计用于线程的重入性处理。此外,模块实现了一个 “简单的” FIFO 队列类型, SimpleQueue ,这个特殊实现为小功能在交换中提供额外的保障。
queue 模块定义了下列类:
-
class queue.Queue(maxsize=0)
:FIFO 队列构造函数,maxsize 设置队列容量上限,容量到达上限时在插入操作会被阻塞。maxsize <= 0 表示*队列。 -
class queue.LifoQueue(maxsize=0)
:LIFO 队列构造函数。 其他同 Queue -
class queue.PriorityQueue(maxsize=0)
:优先级队列构造函数。 其他同 Queue。一般情况下插入和取出的数据是个元组:(priority_number, data) 。 -
class queue.SimpleQueue
:*的 FIFO 队列构造函数。简单的队列,缺少任务跟踪等高级功能。3.7 版本新增
对满的队列使用非阻塞插入操作或者超时插入操作会引发queue.Full异常,同样对空的队列使用非阻塞获取操作或超时获取操作会引发queue.Empty异常。
队列对象常用方法:
- Queue.qsize():返回队列的瞬间大小。
- Queue.put(item, block=True, timeout=None):将 item 放入队列。如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间没有可用的空闲插槽,将引发 Full 异常。反之 (block 是 false),如果空闲插槽立即可用,则把 item 放入队列,否则引发 Full 异常 ( 在这种情况下,timeout 将被忽略)
- Queue.put_nowait(item):相当于 put(item, False):从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)
- Queue.get(block=True, timeout=None):相当于 get(False)
- Queue.task_done():与
join()
方法配合使用,每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。 - Queue.join():与
task_done()
方法配合使用,阻塞至队列中所有的元素都被接收和处理完毕。
上一篇: 爬虫乱码问题
下一篇: python进阶(4)