欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python的多线程

程序员文章站 2024-03-23 18:11:58
...

一、并发和并行

1、并行
parallel,同时做某些事,互不干扰的同一时刻做几件事;
2、并发
concurrency,同时做某些事,强调一个时间段内有这么多事情要处理完;
一对多,一般就产生了并发;多用户集中访问就是高并发;一旦高并发出现问题可能导致服务全线崩溃

二、高并发的解决方案

1、队列queue

天然的最常用的解决方案:排队,先进先出;形成的队列是一个缓冲地带,就是缓冲区buffer;
队列的核心作用:解耦和缓冲
用queue缓冲队列、慢慢处理,不把压力压到窗口上,而是队伍上,逐步追加队伍;就可实现把请求平滑下来,而不是出现小高峰;
或者采用优先队列;不需要锁机制,因为没有争抢
例如queue模块的类Queue、LifoQueue(后进先出)、PriorityQueue(优先队列);buffer常用Queue、PriorityQueue
也可以用双队列解决问题
应用场景:不在意等待时间的情景,简单易用,代价最小;

2、争抢

不文明的争抢模型,一种高效的方案,没有秩序;
只要资源有人争抢,就必须采用锁机制,即一个人占据窗口后,窗口即锁定、不能为其他人提供服务
操作系统里有的资源就采用争抢模型,也是一种很好的解决方案
优点是效率高、资源不空闲,只要有空闲就会有一个等待的请求者立马霸占资源;缺点是有的线程可能永远抢不到资源,有的线程反复都能抢到,比如网速差的人总是看不到页面;对外部用户一般不采用此方案;

3、预处理

第一重要的常用手段,简单理解就是把需求预先准备好,提前多备一些数据,为加快速度一般都是把数据放内存里;一般都会把用户访问频率高的热点数据提前加载,重点是分析热数据、冷数据;
属于缓存的思想:一种把大多数用户需要的数据提前加载,当请求涌来时,这些数据能很快被拿走;预处理思想也叫预加载,一般采用分布式的、在内存中缓存;譬如在自己的内存中用字典实现缓存,当数据规模大,可考虑第三方的redis,分布式;
优点:提高用户访问的速度,从而减少排队的压力
具体做法:最常用的就是字典,例如redis,这种分布式的、用key,value存储数据的数据库

4、并行

可以把请求分散开来,但并行最大的问题是如何分配,无法预知哪个队列速度快,当有新请求时应该分配到哪个队伍;
简单的方式是轮询,但并不是很好的方案;
日常解决方案:一个程序开多个线程;可以通过购买更多服务器,或多开进程、线程实现并行处理,来解决并发问题;

这种增加窗口的拓展都是水平扩展的思想,如果是在硬件级别拓展,成本会上升;
其实底层都是靠硬件支撑,为了方便管理,会进行逻辑划分,就用到虚拟化技术,即云技术,实现逻辑管理,通过弹性拓展、收缩解决水平拓展问题,云技术也可以度量,计量买多少CPU、多少内存,是一种水平拓展方式;不够用就可以买云服务

注意:
1)并行只是解决并发的一种方案,但并行可以水平拓展;
2)如果线程在单CPU上处理,其实是交替处理,就不是并行了,属于串行;
说明:核心数和路数是两码事,2个CPU叫两路,每个CPU可能有多核,每个核心可以认为是一个CPU,单CPU多核相当于多个CPU;主板上可能有多个插槽插CPU,主板上集成的CPU越多,工艺越复杂,越贵,有时买CPU还不如买服务器
3)多数服务器都是多CPU的,服务的部署往往是多机的、分布式的,这都是并行处理

5、提速
提高单个窗口程序的运行效率,在系统内外做程序的优化;
或者提高单机服务器的性能,比如提高CPU运行频率(换超频)、提高网速、增加内存,或单个服务器安装更多的CPU;

这是一种垂直拓展的思想;属于在单机向上提升,在窗口上提升
垂直提升有天花板,制造工艺的代价太大、成本限制,比如CPU频率难以再提升,故常用的还是水平拓展;

6、消息中间件
这是系统外的队列,称为第三方队列,是单独的程序,一般承压能力非常强,会启动一个群集服务、分担压力,而且容量大;从系统外的队列进到系统内部队列后可能还要排队

系统内外队列的区别:
系统内的队列往往和应用程序、内部的窗口和当前系统相关,用的是queue库的队列,而系统外的队列一般属于第三方,无需配备很多queue(窗口),只需配备一个大队列即可
系统外的队列也称消息队列,放在系统和系统之间,这个第三方队列不能替代系统内的queue队列

由于队列的核心作用是解耦和缓冲,系统内和系统外加了一个第三方队列,相当于在系统间放了一个缓冲地带,做系统间解耦,这个队列一加就把程序变成了分布式程序

消息有生产者、消费者,中间加消息队列、解耦同时做缓冲以平滑数据;
常见的消息队列(消息中间件)有RabbitMQ、ActiveMQ(Apache)、RocketMQ(阿里Apache)、Kafka(Apache)都支持分布式
RocketMQ也放到了Apache;大数据领域常用Kafka,ActiveMQ

www.Apache.org 建造了围绕大数据的生态,主要语言是Java,有很多*项目projects和实用工具,比如分布式部署ZooKeeper、Kafka、Redis,都是Apache的项目,这些服务在python有第三方库供使用,只要用接口就可部署好;多学其他框架、减少人工操作;

还有其他的手段解决高并发:

比如多地建服务器(通过DNS引导、解析、分流,把请求引导到各地的服务器,也属于水平拓展,但脱离了机房的概念),比如淘宝,多地均有服务器,并进行数据同步;一般会在用户密集的地方添服务器;

还有就近处理,CDN技术,把常用的静态数据缓存,然后把常需要的数据就近放到跟前,用户获取数据更快;

一般来说不同的并发场景采用不同的策略,策略可能是多种方式的优化组合;
压力小的时候,就用队列、多线程并行,当压力逐渐变大,可考虑预处理、提前缓存,压力再增大,可考虑第三方队列的消息中间件,以及水平拓展、分布式处理,在阿里买虚拟服务器等等;垂直拓展也是一种方式,不过一般会根据成本再策划;
如果涉及到多节点部署,跨机器间的进程同步,可能会用到分布式锁,会用到APACHE的zookeeper(原生Java);

三、进程和线程

1、进程
不是所有的操作系统都有线程,在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位;线程被包含在进程之中,是进程中的实际运作单位;每个进程都有唯一ID;

1)进程Process

一个程序的执行实例就是一个进程;定义是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,一般需分配内存资源、CPU资源;进程是操作系统结构的基础,进程管理是操作系统的核心任务;操作系统要管理进程,例如食堂是一个进程,每个窗口都是线程;

当程序被操作系统加载到内存中、运行起来,就是进程,进程中存放着指令和数据(资源),它是线程的容器;
操作系统的调用:是操作系统暴露出来的功能,非常低级的接口(low level),原始的系统调用一般复杂,一般会封装成高级库;进程和线程的背后原理完全不同;

2)进程和程序的关系

程序是写好的源代码.py文件、或编译后的文件,这些文件存放在磁盘上;进程是放在内存中的、活动的程序,存放着要执行的操作指令和数据
进程就是内存中执行的操作指令和操作的数据(资源),例如python执行文件会形成一个解释器进程,是活动的、放在内存中的;内存中存放着执行的指令,CPU只认机器代码,即指令,进程中也会存放操作资源,如操作的IO端口,数据也是资源

可以理解为进程是线程的容器,进程可以管理线程,进程中实际运作的是线程,进程只是指令和数据的集合、也可称容器,里面包含了线程
正在运行的程序实际是进程,专业术语进程是程序运行在内存中、被操作系统管理的实例,操作系统必须实现进程管理这个子系统

Linux进程有父进程、子进程,从一个进程创建另一个进程,Windows进程是平等关系,不同的操作系统在进程管理的实现上有差异

2、线程

有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元;一个标准的线程由线程ID、当前指令指针(PC)、寄存器集合和堆栈组成;
轻量是指线程开启的代价小,占用的资源少,在许多系统中,创建一个线程比进程快10-100倍;但这不是性能指标,并不是说应该多创建线程少创建进程,进程中实际运作的是线程;

3、进程、线程的理解

现代操作系统提出进程的概念,每个进程都认为自己独占所有的计算机硬件资源;
进程是独立的王国,进程间不可随便共享数据,交互起来极其麻烦;同一个进程内的线程可以共享进程的资源,每个线程拥有自己独立的堆栈,线程与线程之间不共享数据,是属于线程的私有数据;
进程和线程是用来解决并发问题的,即资源不够用;

Python中的进程和线程:进程会启动一个解释器进程,线程共享一个解释器进程;
进程是操作系统资源分配和管理的单位,操作系统是做进程管理,虽然也可以管理到进程中的线程,但这些线程实际上还是由进程管理,但操作系统调度的时候是向进程调度其线程,最终调度的还是线程,所以需要做多线程、多干活;
研究线程数因为进程只是资源调度单位,线程才是干活的,所以一般研究线程的状态;

4、线程的状态

就绪(Ready):线程能够被调度运行,正在等待被调度;线程可能是刚刚被创建启动、也可能是刚刚从阻塞中恢复、或着被其他线程抢占;
运行(Running):线程正在运行;
阻塞(Blocked):线程正等待外部事件发生而无法运行,比如调I/O操作、input;
终止(Terminal):线程完成,或被取消、退出;

调度:指CPU运行线程中的指令,一般需要给线程分配CPU时间,轮到某线程时,会把该线程加载到CPU上运行、执行其指令,当分配的CPU时间用完,时间片用完,会丢回去,线程从运行状态转换为就绪状态;
CPU调度哪个线程实际是操作系统调度,并不是CPU真的调度,CPU选择线程的方案有多种,比如争抢、排序、优先,大多数是争抢

CPU采用分时思想,把运行时间切的很小,造成一种并行的假象,每个线程都有执行机会,都能分到时间片,时间片是微秒级别;优先级高,分的时间越多,感觉上就是有的程序运行快,有的运行慢,有优先级可调整该线程获得的CPU资源时间,并不是指CPU一直运行该线程;

CPU是分时的,一个CPU一次只能调度一个线程,单核调度实际上是非同时,只是时间切换很快;
虚拟化的思想,一个资源当多个用:仿佛把一个CPU掰成多个用,做了很多事;CPU实质就是一种虚拟化的思想,让人以为那就是CPU的工作原理,CPU上跑的实际是线程,是源代码被编译后转化的机器指令,也是运作的最小单位;
CPU只调度就绪状态的,当CPU发现某一线程需调用IO进行较慢的操作时,一般不会等待,而是将该线程调整为阻塞状态,等待数据从磁盘上加载完毕,IO完成后再从阻塞状态变成就绪状态,阻塞的线程并不影响其他线程的调度;

Python的解释器也称PVM,Python虚拟机,把代码转换为中间代码,最后跑的都是机器指令;

四、threading模块

这是Python进行线程开发使用的标准库threading,基于线程的并行化处理方案,解决高并发的方案之一

1、Thread类
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
可以通过类创建一个线程对象,创建好线程对象后,调用start方法,就可启动线程,这种有主线程、工作线程的程序一般称为多线程程序;主线程会一直等工作线程结束;

group:线程组,Python一般不提
target:指线程调用的对象,一般是目标函数,一个线程对象跑一个函数target就行了;
name:为线程取的名字,可给可不给;
args:为目标函数传递的实参,元祖;
kwargs:为目标函数关键字传参,字典;
daemon:是keyword-only

1)线程退出

Python中未提供线程退出的具体方法,但线程可在以下情况退出:
线程函数内语句执行完毕;例如循环break跳出、函数return退出;
线程函数中抛出未处理的异常,直接导致线程崩溃,但退出代码是0,因为主线程正常结束;
可考虑提供一个交互界面,keyboard
Python的线程没有优先级、线程组的概念,也不能被销毁、停止、挂起,也没有恢复、中断;线程库比较简单

2)线程传参

本质上就是函数传参
t = threading.Thread(target=add, name=‘add’, args=(4, 5))
t = threading.Thread(target=add, name=‘add’, args=(5,), kwargs={‘y’: 4})

2、threading的属性和方法

threading模块提供的方法
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包括已经终止的、未开始的线程
get_ident() 返回当前线程的ID、非0整数,比名字好
其中active_count(),enumerate()方法的返回值还包括主线程

import threading
import time
​
​
def showthreadinfo():
    print("current thread = {}".format(threading.current_thread()))
    print("current thread id = {}".format(threading.get_ident()))
    print("main thread = {}".format(threading.main_thread()))
    print("acive count = {}".format(threading.active_count()))
    print("acive thread list = {}".format(threading.enumerate()))
​
def worker():
    count = 1
    showthreadinfo()
    while True:
        if count > 5:
            break
        count += 1
        print("I'm working")
        time.sleep(0.5)
​
t = threading.Thread(target=worker, name='worker') # 线程对象 
showthreadinfo()
t.start() # 启动
print('==End==')
​
​
# 结果
current thread = <_MainThread(MainThread, started 140735957906304)>
current thread id = 140735957906304
main thread = <_MainThread(MainThread, started 140735957906304)>
acive count = 1
acive thread list = [<_MainThread(MainThread, started 140735957906304)>]
​
current thread = <Thread(worker, started 123145312489472)>
current thread id = 123145312489472
==End==
main thread = <_MainThread(MainThread, stopped 140735957906304)>
acive count = 2
acive thread list = [<_MainThread(MainThread, stopped 140735957906304)>, <Thread(worker, started 123145312489472)>]
I'm working
I'm working
I'm working
I'm working
I'm working

3、Thread实例的属性和方法

name:只是一个标识名,可以重名,getName()、setName()可以获取/设置这个名词
ident:线程ID,是非0整数,线程start启动后才有Id,否则为None,线程退出,此ID依旧可以访问,线程ID可以重复利用;当前进程中线程ID唯一
is_alive():返回线程是否活着
线程name只是一个名称,可以重复;ID必须唯一,但可在线程退出后再利用

import threading
import time
​
​
def worker():
    count = 1
    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print(threading.current_thread().name)
​
t = threading.Thread(name='worker', target=worker)
print(t.ident) # None
t.start()
​
while True:
    time.sleep(1)
    if t.is_alive():
        print('{} {} alive'.format(t.name, t.ident))
    else:
        print('{} {} dead'.format(t.name, t.ident))
        
​
# 结果
None
workerworker 123145556844544 alive
​
worker 123145556844544 aliveworker
​
workerworker 123145556844544 alive
​
worker 123145556844544 aliveworker
​
workerworker 123145556844544 alive
​
worker 123145556844544 dead # 线程结束,名字和ID依旧可以访问,因为t还在
worker 123145556844544 dead
worker 123145556844544 dead
...

1)start和run

start() :启动一个线程,每一个线程对象必须且只能执行该方法一次;
该方法实现多线程齐头并进,调的是操作系统的接口,属于系统调用
run() :启动线程函数,即执行target函数
若没有start方法、缺乏对线程的创建,则相当于单线程,在主线程运行,单线程是顺序执行

两者区别:start()方法会调用run()方法,而run()方法仅为运行函数

import threading
import time
​
​
def showthreadinfo():
    print("current thread = {}".format(threading.current_thread()))
    print("main thread = {}".format(threading.main_thread()))
    print("acive count = {}".format(threading.active_count()))
​
def worker():
    count = 1
    showthreadinfo()
    while True:
        if count > 3:
            break
        count += 1
        print("I'm working")
        time.sleep(0.5)
​
class MyThread(threading.Thread):
    def start(self):
        print('start~~~~~~~~~~~~~')
        super().start()
​
    def run(self):
        print('run~~~~~~~~~~~~~')
        super().run()
​
t = MyThread(name='worker', target=worker) 
# t.start()
t.run() # 分别执行start或者run方法
print(t.ident)
​
​
# 结果
run~~~~~~~~~~~~~
current thread = <_MainThread(MainThread, started 140735957906304)>
main thread = <_MainThread(MainThread, started 140735957906304)>
acive count = 1
I'm working
I'm working
I'm working
None  # 一个线程,顺序执行
​
# start方法
start~~~~~~~~~~~~~
run~~~~~~~~~~~~~
current thread = <MyThread(worker, started 123145525018624)>
main thread = <_MainThread(MainThread, started 140735957906304)>
acive count = 2
I'm working
123145525018624  # 多线程效果,主线程结果
I'm working
I'm working

使用start方法启动线程,启动了一个新的线程,名字叫做worker运行;但是使用run方法的,并没有启动新的线程,就是在主线程中调用了一个普通的函数而已;因此启动线程必须使用start方法,才能启动多个线程

2)单线程和多线程

start()涉及到在进程中创建一个新线程,返回一个新线程ID,实质会用到操作系统提供的库函数,进行操作系统的系统调用;
单线程是串行,多线程是齐头并进;但串行并不是一定比并行好、比并行快,对硬件设备,有串行口和并行口,硬盘最早是并行口,后来改成串行口(S);
究竟用单线程还是多线程?串行设备还是并行设备?串口还是并口?各有用途;成本决定了不并行;计算机想用并口,但并口的电路处理更复杂,主板制作成本上升,所以更多的是提高单口效率、传输速率,串行有的也采用分时,和线程道理一样;

并行能提高数据处理能力,以免出现一个程序阻塞使得整个程序停滞;
进程一般会开启多线程,但线程并不是越多越好,不仅管理成本、维护成本会变高,线程是要被操作系统调度的,线程越多越会把时间浪费在线程切换上;线程一般几十个就差不多了;进程中的资源就那么多,线程可能需要抢,一般适用就好、不宜过多;
进程也不是越多越好,也有管理负担

五、多线程

多个线程,一个进程中如果有多个线程,就是多线程,实现一种并发;它是一种并发解决方案,快速解决用户请求;不同的线程可以用同样的target函数,函数调用是各自压栈,互不干扰;即使是同一个函数,也要看它在哪个线程执行;

1、主线程和工作线程

一个进程中至少有一个线程,并作为程序的入口,入口的这个线程就是主线程;
Python中:运行的那个模块就会被加载到主线程中、在主线程中运行;例如右键运行t.py,相当于把t模块运行在主线程中;主线程中可开启多个工作线程;

一个进程至少有一个主线程,其他线程称为工作线程;主线程是第一个启动的线程;
主线程一般做一些协调管理工作,工作线程一般完成具体功能;
父线程:如果线程A中启动了一个线程B,A就是B的父线程;
子线程:B就是A的子线程

2、线程安全

1)定义

线程执行一段代码,不会产生不确定的结果,那这段线程就是安全的;
例如:print函数分两步,第一步打印字符串,第二步换行,就在这之间,可能发生线程的切换(顺序可能与预期不同,因为每个线程都是分时间片的,这种交替的效果正好契合了‘ 并行 ’);这说明print函数不是线程安全函数,但print能保证打印的内容不被切开,因为打印时字符串是只读的、不可切割;
注意:只要进行多线程编程,就要考虑线程安全
线程安全有代价,可能会降低效率;线程不安全虽然结果不可预期,但可以采用其他方式尽量避免,其效率较高;

2)输出日志的解决方法

多线程编程时输出日志的方法
不让print打印换行,end=’ ';反正字符串是不可变的类型,它可以作为一个整体不可分割输出
使用标准库的日志处理模块logging;是线程安全的,生成环境代码都使用logging
logging是开发中做日志输出时用到的模块,把事件添加到日志中;事件有消息描述message,事件具有重要性(即级别或严重性);默认级别是warning,级别太低时可能打印不出来
logging.warning(msg, *args, **kwargs)
在根记录器上记录一条带有严重警告的消息,如果记录器没有处理程序,则调用basicConfig()来添加具有预定义格式的控制台处理程序
logging.info(msg, *args, **kwargs)
严重性为INFO,等级较低
用logging.warning输出走的就是标准错误输出stderror,是记录到输出中而不是文件中,由某些程序进行管理;写程序一般都有日志输出;

import threading
import logging
​
def worker():
    for x in range(3):
        #print("{} is running.\n".format(threading.current_thread().name), end='')
        logging.warning("{} is running.".format(threading.current_thread().name))
​
for x in range(1, 5):
    name = "worker{}".format(x)
    t = threading.Thread(name=name, target=worker)
    t.start()
​
    
# 结果
WARNING:root:worker1 is running.
WARNING:root:worker1 is running.
WARNING:root:worker1 is running.
WARNING:root:worker2 is running.
WARNING:root:worker3 is running.
WARNING:root:worker2 is running.
WARNING:root:worker3 is running.
WARNING:root:worker4 is running.
WARNING:root:worker2 is running.
WARNING:root:worker3 is running.
WARNING:root:worker4 is running.
WARNING:root:worker4 is running.

3、daemon和non-daemon

这里的daemon不是linux的守护进程,那是后台运行的进程(幕前属于前台,看不见的是后台,后台进程也叫守护进程)
可以在进程管理器中把不需要的线程关掉;enumerate显示的进程是解释器给看的线程,还有一些其他线程;

1)daemon属性

Python中构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好;或者在start之前再setDaemon,但很少这么做;
daemon属性:表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常;
isDaemon():是否是daemon线程
setDaemon():设置为daemon线程,必须在start方法之前设置

线程daemon属性,如果设定就是用户的设置,否则就取创建它的父线程的daemon值
例如:某工作线程如果不写daemon,就会从主线程那里继承,而主线程启动一定是non-daemon,从主线程创建、又未设置daemon,均默认non-daemon,主线程必须等待;主线程退出时一定要扫一眼是否还有non-daemon线程,只要有,主线程就必须等待;

import time
import threading
​
def fn():
    print('fn')
​
def bar():
    t = threading.Thread(target=fn)
    print(2,t.isDaemon())
    t.start()
    print('bar')
​
def foo():
    for i in range(8):
        print(i)
    t = threading.Thread(target=bar)
    print(1,t.isDaemon())
    t.start()
​
t = threading.Thread(target=foo, daemon=False) 
t.start()
time.sleep(0.1)
print('Main Thread Exiting') 
​
​
# 结果
0
1
2
3
4
5
6
7
1 False
2 False
fnbar​
Main Thread Exiting

import time
import threading
​
def fn():
    time.sleep(0.2)
    print('fn')
​
def bar():
    t = threading.Thread(target=fn)
    print(2,t.isDaemon())
    t.start()
    print('bar')
​
def foo():
    for i in range(8):
        print(i)
    t = threading.Thread(target=bar)
    print(1,t.isDaemon())
    t.start()
​
t = threading.Thread(target=foo, daemon=True) 
t.start()
print('Main Thread Exiting') 
​
​
# 结果
0Main Thread Exiting
1
2
3
[Finished in 0.0s]

2)daemon和non-daemon

线程具有一个daemon属性,可以设置为True或False,也可以不设置,就默认daemon=None;如果daemon=None,就取当前线程、即父线程的daemon来设置它;父子关系不能乱,否则无法知道是否是daemon线程;
主线程是non-daemon线程,即daemon = False;从主线程创建的所有线程如果不设置daemon属性,则默认都是daemon = False,也就是non-daemon线程;
如果有non-daemon线程,主线程会直到所有non- daemon线程全部结束,此时如果还有daemon线程,主线程需要退出,会结束所有daemon线程,然后退出;进程会进行监测,如果除了主线程,其他都为daemon线程,就会杀掉daemon线程,主线程退出;

主线程一旦退出,整个进程结束,正常结束的话,状态返回码为0;如果异常则为非0;
如果是其他的工作线程意外退出、比如突然崩溃,主线程照常正常退出,状态码为0;
进程的返回码是看主线程的返回码;

关于等待和不等待的示例:

# 示例1,不等待,因为non-daemon还未启动
import time
import threading
​
def bar():
    time.sleep(5)
    print('bar')
​
def foo():
    for i in range(20):
        print(i)
    t = threading.Thread(target=bar, daemon=False)
    t.start()
​
t = threading.Thread(target=foo, daemon=True) 
t.start()
print('Main Thread Exiting')
​
# 结果
0
Main Thread Exiting1

# 示例2,等待,因为存在non-daemon线程、已启动
import time
import threading
​
def bar():
    time.sleep(2)
    print('bar')
​
def foo():
    for i in range(8):
        print(i)
    t = threading.Thread(target=bar, daemon=False)
    t.start()
​
t = threading.Thread(target=foo, daemon=True) 
t.start()
time.sleep(0.1)
print('Main Thread Exiting') 
​
# 结果
0
1
2
3
4
5
6
7
Main Thread Exiting
bar
[Finished in 2.1s]

# 示例1
import time
import threading
​
​
def foo(n):
    for i in range(n):
        print(i)
        time.sleep(1)
​
t1 = threading.Thread(target=foo, args=(5,), daemon=True) # 调换10和20看看效果 
t1.start()
t2 = threading.Thread(target=foo, args=(10,),  daemon=False) #
t2.start()
​
time.sleep(2)
print('Main Thread Exiting')
​
# 结果
0
0
11
​
Main Thread Exiting
2
2
33
​
4
4
5
6
7
8
9


# 示例2
import time
import threading
​
​
def foo(n):
    for i in range(n):
        print(i)
        time.sleep(1)
​
t1 = threading.Thread(target=foo, args=(10,), daemon=True) # 调换10和20看看效果 
t1.start()
t2 = threading.Thread(target=foo, args=(5,),  daemon=False) #
t2.start()
​
time.sleep(2)
print('Main Thread Exiting')
​
# 结果
0
0
1
1
Main Thread Exiting
22
​
33
​
44

4、join方法

实现等待daemon线程,但不是用了join就能解决全部问题,join的坏处是会阻塞某些线程,例如搞混了父子线程继承的话,全设成daemon,可能造成主线程频繁阻塞,如果在主线程中对所有线程都join的话;

join(timeout=None),是线程的标准方法之一,大多数情况都不会设时间,目的是永久阻塞;
一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止;
一个线程可以被join多次;
timeout参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束;

import time
import threading
​
def foo(n):
    for i in range(n):
        print(i)
        time.sleep(0.1)
​
t1 = threading.Thread(target=foo, args=(4,), daemon=True) 
t1.start()
t1.join() # 当前线程会等待t1,设置join,取消join对比一下
print('Main Thread Exiting')
​
# 设置join结果
0
1
2
3
Main Thread Exiting
[Finished in 0.5s]
​
# 取消join结果
0
Main Thread Exiting
[Finished in 0.1s]

某线程调用了t1的join方法,某线程就必须等t1结束才能继续往下执行,否则一直处于阻塞状态;例如在a线程中调用了b线程的join方法,a线程*阻塞、等待b线程执行完成后,a线程的阻塞才会取消;

线程执行完毕的标志是:函数语句执行完毕、或者函数内抛出了未处理的异常;
创建出来的新线程不受其父线程的管制,不会因为父线程函数执行完毕消亡了就消亡,只有主线程退出时会kill掉daemon线程,不会因为父线程消亡导致子线程消亡;线程被创建出来后就受进程管制;

主线程的退出原则一致没变,只要它执行完毕,就会想退出,然后扫一圈是否有non-daemon;

import threading 
import time
​
def work():
    time.sleep(5)
    print('work')
​
def bar():
    t=threading.Thread(target=work)
    t.start()
    time.sleep(2)
    print('bar') # 执行到这里,t1线程已经执行完毕,主线程不再等,直接打印最后一句
                 # 主线程走完最后一句,发现还有t线程在执行,且t线程为daemon线程,果断kill、结束
​
t1=threading.Thread(target=bar,daemon=True)
t1.start()
t1.join()
​
print('main thread exits')
​
​
# 结果
bar
main thread exits
[Finished in 2.1s]

几个注意点

如果在non-daemon线程A中,对另一个daemon线程B使用了join方法,这个线程B设置成daemon就没有什么意 义了,因为non-daemon线程A总是要等待B;
如果在一个daemon线程C中,对另一个daemon线程D使用了join方法,只能说明C要等待D,主线程退出,C和D 不管是否结束,也不管它们谁等谁,都要被杀掉;因为主线程只等non-daemon

import time
import threading
​
def bar():
    while True:
        time.sleep(1)
        print('bar')
​
def foo():
    print("t1's daemon = {}".format(threading.current_thread().isDaemon()))
    t2 = threading.Thread(target=bar)
    t2.start()
    print("t2's daemon = {}".format(t2.isDaemon()))
    t2.join()
​
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
time.sleep(3)
print('Main Thread Exiting')
​
# 结果
t1's daemon = True
t2's daemon = True
bar
bar
Main Thread Exiting

5、daemon线程的应用场景

daemon thread 这个概念唯一的作用就是,当把一个线程设置为 daemon,它会随主线程的退出而退出;daemon线程简化了程序员手动关闭线程的工作;

主要应用场景有:

后台任务
如发送心跳包、监控,这种场景最多;例如要后台监测5台服务器的运行状态,会开5个线程每隔几秒和服务器做个交互,判断是否健康存活,一旦想关掉,只需关主线程,其他线程设成daemon即可;
心跳包是服务程序,主线程一关,大家一起关掉;

主线程工作才有用的线程
如主线程中维护着公共资源,如果主线程已经清理完毕准备退出,那么工作线程使用这些资源工作也没有意义了,一起退出最合适;

随时可以被终止的线程
如果主线程退出,想所有其它工作线程一起退出,就使用daemon=True来创建工作线程;
比如,开启一个线程定时判断WEB服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程退出一起退出;这种daemon线程一旦创建,就可以忘记它了,只用关心主线程什么时候退出就行了,它们就会自动被关闭;写服务的时候常常会设置成daemon线程;

6、threading.local类

多线程中每个线程完成不同的计算任务;如果函数中 x是局部变量,每一个线程的x是独立的,互不干扰;如果使用全局变量,线程之间可能会互相干扰、导致不期望的结果;

1)threading.local()

将这个类实例化得到一个全局对象,实现既能使用全局对象、又能保持每个线程使用不同的数据;不同的线程使用这个对象存储的数据、其他线程看不见;

import threading
import time
​
# 全局对象
global_data = threading.local()
  
def worker():
    global_data.x = 0
    for i in range(100):
        time.sleep(0.0001)
        global_data.x += 1
    print(threading.current_thread(), global_data.x)
​
for i in range(5):
    threading.Thread(target=worker).start()
    
# 结果
# 和使用局部变量效果一样
<Thread(Thread-1, started 123145446285312)> 100<Thread(Thread-3, started 123145456795648)> 
<Thread(Thread-2, started 123145451540480)> 100
100
<Thread(Thread-4, started 123145462050816)> <Thread(Thread-5, started 123145467305984)> 100
100
import threading
​
X = 'abc'
ctx = threading.local() # 注意这个对象所处的线程 
ctx.x = 123
print(ctx, type(ctx), ctx.x)  # <_thread._local object at 0x10e4b97d8> <class '_thread._local'> 123
​
def worker():
    print(X)
    print(ctx)
    print(ctx.x)
    print('working')
​
worker() # 普通函数调用
threading.Thread(target=worker).start() # 另起一个线程
​
# 结果
abc
<_thread._local object at 0x10e4b97d8>
123
working
​
abc
<_thread._local object at 0x10e4b97d8>
Exception in thread Thread-1: 
AttributeError: '_thread._local' object has no attribute 'x'
新线程中能打印ctx,但ctx中看不到x,这个x不能跨线程;

2)threading.local本质

threading.local类构建了一个大字典,存放所有线程相关的字典,定义如下:
{ id(Thread) -> (ref(Thread), thread-local dict) }
每一线程实例的id为key,value为元祖; value中2部分为,线程对象引用,每个线程自己的字典;
运行时,threading.local实例处在不同的线程中,就从大字典中找到与当前线程相关的键值对中的字典,覆盖 threading.local实例的 dict ;这样就可以在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全;

3)实现线程安全的方法

让全局变量变成局部变量
让全局变量变成threading.local类的实例

7、threading.Timer 定时器

threading.Timer继承自Thread类,用来定义延迟多久后执行一个函数;
这个类也叫定时器,可以延迟执行;

1)threading.Timer

threading.Timer(interval, function, args=None, kwargs=None)
start方法执行之后,Timer对象会处于等待状态,等待了interval秒之后,开始执行function函数;

import threading
import logging
import time
​
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
​
def worker():
    logging.info('in worker')
    time.sleep(2)
    
t = threading.Timer(4, worker)
t.setName('timer')
# t.cancel()
t.start()
# t.cancel()
while True:
    print(threading.enumerate())
    time.sleep(1)
    
# 结果
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
2018-10-12 13:35:01,071 timer 123145436123136 in worker
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145436123136)>]
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>]
...
​
# 打开第一个cancel
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>]
​
# 打开第二个cancel,因为worker还未开始执行
[<_MainThread(MainThread, started 140735817569152)>, <Timer(timer, started 123145404346368)>]
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>]
[<_MainThread(MainThread, started 140735817569152)>] 

2)cancel()

Timer提供了cancel方法,用来取消一个未执行的函数,如果上面例子中worker函数已经开始执行,cancel就没有任何效果了;
cancel() 一旦置一,无论在start前后,只要function未执行,即run开始之前,就结束,不执行function

3)Timer 总结

Timer是线程Thread的子类,就是线程类,具有线程的能力和特征;
它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它;
cancel方法本质使用Event类实现,改变了Event一个量,这并不代表线程提供了取消的方法;