欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python 进阶学习笔记之六:多线程编程

程序员文章站 2022-06-04 09:02:24
...

Python 进阶系列笔记第四篇,前置文章链接:
Python 进阶学习笔记之一:内置常用类型及方法
Python 进阶学习笔记之二:常用数据类型(上)
Python 进阶学习笔记之三:常用数据类型(下)
Python 进阶学习笔记之四:高效迭代器工具
Python 进阶学习笔记之五:异步 IO

1. 多线程编程 - threading

在多核服务器上,多线程充分利用多核特性提升效率是很普遍的做法,threading 模块就提供了支持多线程编码的高级 API。threading 模块是在较低级的模块 _thread 基础上建立较高级的线程接口,推荐在 3.7 版本后使用本模块构建普通多线程程序。如果你想让你的应用更好的利用多核计算机的计算性能,推荐你使用 multiprocessing模块或者 concurrent.futures.ProcessPoolExecutor类。但是如果你想同时运行多个I/O绑定任务,线程仍然是一个合适的模型。

先代码示例最基本用法:

import threading
import time

local_param = threading.local()                   # 线程 local 变量是一种能存储当前线程变量的一种容器类型,专门用于多线程变量存储

def do_somthing():
    local_param.name = threading.current_thread().getName()     # 当前代码示例中,可以使用此方法获取当前线程的名称
    print(f"{local_param.name} is doing something...")
    time.sleep(1)
    print("over")

mt = threading.Thread(target=do_somthing, name="n1")               # 一种线程实例话方法
mt2 = threading.Thread(target=do_somthing, name="n2")

mt.start()
mt2.start()

# 输出
n1 is doing something...
n2 is doing something...
over
over
# 用法2
local_param = threading.local()

class MyThread(threading.Thread):
    def run(self):
        name = self.name                                      # 继承 Thread 的方式实现一个线程,可以直接取线程名称
        local_param.name = name
        print(f"{local_param.name} is doing something...")
        time.sleep(1)
        print(f"{name} is over")

t1 = MyThread()
t2 = MyThread()

t1.setName("t1")
t2.setName("t2")

t1.start()
t2.start()

# 输出
n1 is doing something...
n2 is doing something...
over
over

上面示例中就启动了两个线程,具体业务可以拿上面两个写法进行扩展了。下面具体讲解一下原生 线程的一些同步操作工具和语法。

查看模块 threading 的源码文件,查看模块注释可以知道,该模块的设计基于 Java 的线程模型,Python 的 Thread 类只是 Java 的 Thread 类的一个子集;目前还没有优先级,没有线程组,线程还不能被销毁、停止、暂停、恢复或中断。

1.1 线程本地数据

线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个 local (或者一个子类型)的实例并在实例中储存属性:

mydata = threading.local()
mydata.x = 1

在不同的线程中,实例的值会不同,上面的简例中已经有了基本应用。

1.2 线程对象

线程对象通过 threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)进行实例或者继承它在进行实例,实例后需要调用 start()方法来启动线程,直到线程内部的 run()方法执行完成,它都处于 ‘活跃’ 状态,可以用 is_alive()方法用于检查线程是否存活。Python 3.3 新加入的 daemon参数,用来标记一个线程是否是守护线程。

1.3 上下文管理

上下文管理,也就是 with 语句,在这个模块提供的带有 acquire() 和 release() 方法的对象,都可以被用作 with 语句的上下文管理器,当进入语句块时 acquire() 方法会被调用,退出语句块时 release() 会被调用。因此,以下片段:

with some_lock:
    # do something...

相当于:

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

Python 的上下文管理特性,在很多场景下使用,最常用的就是文件的打开操作,而且我们自定义的类也可以实现上下文管理,不过这不是这篇的重点,有兴趣的同学参阅专门介绍的文章。下面关于同步组件的介绍,都支持上下文管理,默认都按此特性编写示例代码。

1.4 锁对象 - Lock

如果读过本系列的异步IO章节,应该还记得在异步编程中的各种同步原语,在多线程中,这些同步组件的概念依然适用,只是使用方式略有不同。
threading.Lock:获得一个锁对象,一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放。一个 Lock 对象只有两种状态:‘锁定’ 与 ‘非锁定’,一旦被第一次锁定,在释放之前,任何线程都不能再次获取锁。一个线程中嵌套获取锁操作会造成死锁,而且Python的解释器还不一定能检查出来,一定要注意。

import threading
import time

lock = threading.Lock()

class MyThread(threading.Thread):

    def run(self):
        name = threading.current_thread().getName()
        print(f"{name} 等待锁")
        with lock:
            print(f"{name} 获取锁")
            time.sleep(1)

        print(f"{name} 释放锁")

t1 = MyThread()
t2 = MyThread()

t1.setName("t1")
t2.setName("t2")

t1.start()
t2.start()

# 输出
t1 等待锁
t1 获取锁
t2 等待锁
t1 释放锁
t2 获取锁
t2 释放锁

1.5 可重入锁 - RLock

重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 “所属线程” 和 “递归等级” 的概念。和上面的 Lock 不同,可重入锁就是为一个线程内部嵌套使用设计的,其使用方法如下:

import threading
import time

lock = threading.RLock()          # 这里如果是 Lock 对象,程序会产生死锁

class MyThread(threading.Thread):

    def run(self):
        name = threading.current_thread().getName()
        print(f"{name} 等待锁")
        with lock:
            print(f"{name} 获取锁")
            time.sleep(1)

            self.__pm()

        print(f"{name} 释放锁")

    def __pm(self):
        name = threading.current_thread().getName()
        with lock:
            print(f'{name}再次获取一次锁')

t1 = MyThread()
t2 = MyThread()

t1.setName("t1")
t2.setName("t2")

t1.start()
t2.start()

# 输出
t1 等待锁
t1 获取锁
t2 等待锁
t1再次获取一次锁
t1 释放锁
t2 获取锁
t2再次获取一次锁
t2 释放锁

1.6 条件对象 - Condition

threading.Condition(lock=None):条件对象,允许一个或多个线程在被其它线程所通知之前进行等待。其参数 lock 若不为空,必须是上面 Lock 或 RLock 的一种,默认内部会使用 RLock。使用场景上,上面的锁机制一般用于相通逻辑对同一个资源的竞争上面,而Condition 一般用于解决类似 “生产者-消费者” 问题模型上。下面让我看一个简单的例子,一个生产者,一个消费者的情况:

import threading
import time

condition = threading.Condition()
item_available = False

def consumer():
    global item_available
    while True:
        with condition:
            while not condition_flag():
                condition.wait()

            print("消费者处理事件")
            item_available = False
            condition.notify()
            print("消费者处理事件完成,唤醒生产者")
            time.sleep(2)

def producer():
    global item_available
    while True:
        with condition:
            while condition_flag():
                condition.wait()

            print("生产者处理事件")
            item_available = True
            condition.notify()
            print("生产者处理完成,唤醒消费者")
            time.sleep(2)

def condition_flag():
    return item_available

c = threading.Thread(target=consumer)
p = threading.Thread(target=producer)

c.start()
p.start()

# 输出
生产者处理事件
生产者处理完成,唤醒消费者
消费者处理事件
消费者处理事件完成,唤醒生产者
生产者处理事件
生产者处理完成,唤醒消费者
消费者处理事件
消费者处理事件完成,唤醒生产者
......

如果是多个消费者,多个生产者场景下,对变量 item_available要进行同步操作限制,否则可能会出现意想不到的逻辑错误。

1.7 信号量对象 - Semaphore

信号量对象算是计算机科学史上最古老的同步原语之一,一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。
信号量有两个可用的实现类,一个是普通的:class threading.Semaphore(value=1);一个是有界信号量:class threading.BoundedSemaphore(value=1);有界信号量通过检查以确保它当前的值不会超过初始值,使用有界信号量能减少这种编程错误:信号量的释放次数多于其请求次数。

import threading
import time

max_conn = threading.BoundedSemaphore(2)

class Business(threading.Thread):

    def run(self):
        print(f"{self.name} 等待信号量")
        with max_conn:
            print(f"{self.name} 获取信号量")
            time.sleep(2)
        print(f"{self.name} 释放信号量")

t1 = Business(name='t1')
t2 = Business(name='t2')
t3 = Business(name='t3')
t4 = Business(name='t4')

t1.start()
t2.start()
t3.start()
t4.start()

# 输出
t1 等待信号量
t1 获取信号量
t2 等待信号量
t3 等待信号量
t2 获取信号量
t4 等待信号量
t1 释放信号量
t3 获取信号量
t2 释放信号量
t4 获取信号量
t3 释放信号量
t4 释放信号量

1.8 事件对象 - Event

事件对象是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。
很简单,没什么可说的,上代码:

import threading
import time
from datetime import datetime

event = threading.Event()


class Ping(threading.Thread):

    def run(self):
        print(f"{self.name} 开始等待 - {datetime.now().strftime('%H:%m:%S')}")
        event.wait()                                                                                                    # 方法签名是 wait(timeout=None),可以设置等待超时时间
        print(f"{self.name} 结束等待 - {datetime.now().strftime('%H:%m:%S')}")


class Pang(threading.Thread):

    def run(self):
        time.sleep(3)
        print(f"{self.name} 触发事件")
        event.set()


ping = Ping(name='ping')
pang = Pang(name='pang')

ping.start()
pang.start()

# 输出
ping 开始等待 - 17:09:43
pang 触发事件
ping 结束等待 - 17:09:46

1.9 定时器对象 - Timer

定时器对象 Timer,参照 Java 的 Timer 实现,但功能上相对于Java还是差的不少,Java 的 Timer 可以延迟执行,定期执行,循环延迟执行等,而 threading.Timer 充其量是个延迟执行器,和 Javascript 中的 setTimeout 定时方法用法差不多。

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()           # 30秒后 "hello, world" 会被打印出来

1.10 栅栏对象 - Barrier

栅栏对象是在 3.2 版本新增的功能,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。其大概应用场景类似于一个操场上多个跑道中多个运动员,等指定人数的运动员全部聚集到起跑线后,比赛才能开始,而栅栏就是这一起跑线。
最常见的程序场景就是多个线程之间的执行同步,在某个逻辑点,运行快的要等一下运行慢的,等所有进程都到达这个逻辑点,在一起向下执行。

import threading
import time

barrier = threading.Barrier(3)

def action(delay_time=0):
    """
    循环5此后退出
    """
    name = threading.current_thread().getName()
    loop = 3
    while loop > 0:
        print("{0} 循环第 {1} 次".format(name, 4-loop))
        loop -= 1
        time.sleep(delay_time)

    barrier.wait()
    print("{0} 循环结束".format(name))


t1 = threading.Thread(target=action, name='t1', args=(1,))                  # 给线程的传参方式之一:给 delay_time 赋值 1
t2 = threading.Thread(target=action, name='t2', kwargs={'delay_time': 2})   # 给线程的传参方式之二:给 delay_time 赋值 2
t3 = threading.Thread(target=action, name='t2', kwargs={'delay_time': 3})

t1.start()
t2.start()
t3.start()

程序允许输出会看到,允许快的会在 loop 指定的循环次数完成后阻塞主,知道所有线程都循环完成了 loop 次,才会一起打印 循环结束

1.11 线程池

如果要使用多线程编程,就要知道对操作系统来说,创建一个线程的代价是相当大,而且操作系统中同时存在的线程数量是有上限的,这个上限也是非常的小,因此当我们的实现逻辑需要频繁创建线程的,就要考虑使用线程池了。

线程池是一个管理线程的容器,在 Python 的实现中,它被设计成为一个有界管理器,其内部是依靠一个 queue.SimpleQueue来缓存提交的任务,一个set来管理正在运行的线程。

注:在 Java 中可以有*的线程池,但一般都会设计一种阻塞队列来缓存提交过来的任务,不会无限制的创建线程,Python 中实现的线程池相对于 Java 来说是阉割版的。

class ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):线程池声明描述:

  • max_workers: 最大同时运行线程数
  • thread_name_prefix: 线程名称前缀,3.6 版本新增
  • initializer: 每个工作者线程开始处调用的一个可选可调用对象,3.7 版本新增
  • initargs: 传递给初始化器的元组参数,3.7 版本新增
from concurrent.futures import thread
import time
import threading

def action(action_name):
    name = threading.current_thread().getName()
    print(f"{name} 执行完成 {action_name}")

def manger():
    with thread.ThreadPoolExecutor(max_workers=3, thread_name_prefix='work') as executor:
        for n in range(1, 11):
            executor.submit(action, f"action-{n}")
            time.sleep(0.5)

if __name__ == '__main__':
    manger()

# 输出
work_0 执行完成 action-1
work_0 执行完成 action-2
work_0 执行完成 action-3
work_2 执行完成 action-4
work_0 执行完成 action-5
work_2 执行完成 action-6
work_0 执行完成 action-7
work_2 执行完成 action-8
work_0 执行完成 action-9
work_2 执行完成 action-10

从输出来看,可以看出自始至终只有 3 个线程运行提交的 action。

1.12 事件调度器 - sched

严格来说 sched 提供的不是一种多线程程序,它只是一种根据时间和优先级排序执行的一个调度器。它提供了一个类 class sched.scheduler(timefunc=time.monotonic, delayfunc=time.sleep)来管理多个任务的执行顺序,其两个参数大部分场景下使用其默认值即可。

在 sched.scheduler 实例中最常用下面几个方法:

  • scheduler.enterabs(time, priority, action, argument=(), kwargs={}):在调度器运行之前向调度器添加一个事件。time 值是一个能够靠上面那个timefunc计算出来的数值,如果使用默认的 time.monotonic 方法,这个time的单位是秒。
  • scheduler.enter(delay, priority, action, argument=(), kwargs={}):调度器运行之前向调度器添加一个事件,delay 是一个延迟时间值,单位同 enterabs方法的 time 参数一样,enter 实现也是调用了 enterabs方法。
  • scheduler.cancel(event):终止某个已经加入的事件,如果事件不存在或者已经执行完,会触发 ValueError。
  • scheduler.run(blocking=True):启动调度器。
import sched

scheduler = sched.scheduler()

def action(name):
    print(f"{name} is running")

if __name__ == '__main__':
    scheduler.enter(3, 1, action=action, kwargs={"name": 'worker2'})
    scheduler.enter(2, 1, action=action, argument=('worker1',))
    scheduler.run()

# 输出
worker1 is running
worker2 is running

1.13 同步队列 - queue

在多个线程之间交换数据场景中,同步队列是很有用的工具。Python 的 queue 模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序。在 FIFO 队列中,先添加的任务先取回。在 LIFO 队列中,最近被添加的条目先取回(操作类似一个堆栈)。优先级队列中,条目将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回。在内部,这三个类型的队列使用锁来临时阻塞竞争线程;然而,它们并未被设计用于线程的重入性处理。此外,模块实现了一个 “简单的” FIFO 队列类型, SimpleQueue ,这个特殊实现为小功能在交换中提供额外的保障。

queue 模块定义了下列类:

  • class queue.Queue(maxsize=0):FIFO 队列构造函数,maxsize 设置队列容量上限,容量到达上限时在插入操作会被阻塞。maxsize <= 0 表示*队列。
  • class queue.LifoQueue(maxsize=0):LIFO 队列构造函数。 其他同 Queue
  • class queue.PriorityQueue(maxsize=0):优先级队列构造函数。 其他同 Queue。一般情况下插入和取出的数据是个元组:(priority_number, data) 。
  • class queue.SimpleQueue:*的 FIFO 队列构造函数。简单的队列,缺少任务跟踪等高级功能。3.7 版本新增

对满的队列使用非阻塞插入操作或者超时插入操作会引发queue.Full异常,同样对空的队列使用非阻塞获取操作或超时获取操作会引发queue.Empty异常。

队列对象常用方法:

  • Queue.qsize():返回队列的瞬间大小。
  • Queue.put(item, block=True, timeout=None):将 item 放入队列。如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间没有可用的空闲插槽,将引发 Full 异常。反之 (block 是 false),如果空闲插槽立即可用,则把 item 放入队列,否则引发 Full 异常 ( 在这种情况下,timeout 将被忽略)
  • Queue.put_nowait(item):相当于 put(item, False):从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)
  • Queue.get(block=True, timeout=None):相当于 get(False)
  • Queue.task_done():与 join() 方法配合使用,每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。
  • Queue.join():与 task_done() 方法配合使用,阻塞至队列中所有的元素都被接收和处理完毕。
相关标签: Python 进阶