python 多线程threading模块
文章目录
一、多线程基本概念
-
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理。
- 用户界面可以更加吸引人,比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
- 程序的运行速度可能加快。
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
-
每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在进程中,由应用程序提供多个线程执行控制。
-
每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
-
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
- 线程可以被抢占(中断)。
- 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) – 这就是线程的退让。
-
线程可以分为:
- 内核线程:由操作系统内核创建和撤销。
- 用户线程:不需要内核支持而在用户程序中实现的线程。
-
多线程不一定高效
- 多线程不一定运行在多个核心上,很可能是一个核心在不停的做上下文切换
- 如果不用多核形式的 ”硬件并发“ ,而采用上下文切换式的 ”软件并发“,并不能提升太多效率。反而可能使效率下降
-
使用并发的原因
-
任务拆分:在编写软件的时候,将相关的代码放在一起,将无关的代码分开,这是一个好主意,这样能够让程序更加容易理解和测试。将程序划分成不同的任务,每个线程执行一个任务或者多个任务,可以将整个程序的逻辑变得更加简单。
-
提高性能:在两种情况下,并发能够提高性能。
-
任务并行(task parallelism):将一个单个任务分成若干个部分各自并行运行,从而降低运行时间。
- 如果一个部分的执行需要使用到另一个任务的执行结果,这个时候并不能很好的并行完成。
- 如果不能多个线程分配到不同的处理机执行,而是在一个处理机上进行上下文切换,也不能真正提升性能
- 在一些场合下,虽然多线程不能提升性能,但是可以带来更好的用户体验。比如制作一个下载器软件,如果下载过程的数据传输和UI控制在一个线程中,当下载进行时UI界面就会卡死,用户不能点击取消或者暂停之类的按钮,只能等传输完成才能重新控制UI
-
数据并行(data parallelism):每个线程在不同的数据部分上执行相同的操作。
-
-
二、threading模块的基本使用
import threading
#子线程起始函数
def thread_job():
print("this is an added Thread,number is {}".format(threading.current_thread()))
def main():
print(threading.active_count()) #当前**的线程数
print(threading.enumerate()) #打印当前所有线程信息
print(threading.currentThread())#打印正在执行这行代码的线程
added_thread = threading.Thread(target = thread_job)
added_thread.start() #启动自定义线程
if __name__ == "__main__":
main()
(1)threading模块的常用方法
-
threading.active_count()
:返回当前**的线程数 -
threading.enumerate()
:返回当前**的线程信息 -
threading.currentThread()
:返回正在执行这行代码的线对象 -
threading.Thread(target = 函数名)
:按指定的起始函数创建一个子线程(Thread类对象),返回线程对象 -
added_thread.start()
:启动added_thread
线程
(2)Thread类的常用方法
使用threading.Thread
创建子线程,它本质上是一个Thread
类对象,它提供了以下常用方法
-
run()
: 用以表示线程活动的方法。 -
start()
:启动线程活动。 -
join([time])
: 等待至线程中止。这句代码阻塞调用线程直至其子线程被调用中止- 子线程正常退出
- 子线程抛出未处理的异常
- 主线程
join
语句中可选的超时发生。
-
isAlive()
: 返回线程是否活动的。 -
getName()
: 返回线程名。 -
setName()
: 设置线程名
三、join()
-
格式:
join([time])
:这将阻塞调用线程,直至被调用线程的join()
方法被调用中止 -
示例
-
原程序
import threading import time #子线程起始函数 def thread_job(): print("T1 strat\n") for i in range(10): time.sleep(0.1) #线程休眠0.1s print("T1 finish\n") def main(): added_thread = threading.Thread(target = thread_job,name = "T1") added_thread.start() #启动自定义线程 print("all done\n") if __name__ == "__main__": main() '''输出 T1 strat all done T1 finish '''
-
time.sleep(n)
:线程休眠n秒 -
这个程序运行中,先输出T1 strat和all done,过一阵才输出T1 finish
- 这里主线程先结束了,但是子线程过了一会才结束
- 如果类比C++多线程,可以认为python里的线程默认是
detach
的
-
下面用
join()
修改这个程序(只加了一句join
代码)import threading import time #子线程起始函数 def thread_job(): print("T1 strat\n") for i in range(10): time.sleep(0.1) #线程休眠0.1s print("T1 finish\n") def main(): added_thread = threading.Thread(target = thread_job,name = "T1") added_thread.start() #启动自定义线程 added_thread.join() #强制主线程等子线程结束才能向下执行 print("all done\n") if __name__ == "__main__": main() '''输出 T1 strat T1 finish all done '''
-
added_thread.join()
:对一个线程调用.join()
方法,要求此线程的程序在这里阻塞,直到它的子线程结束才能继续向下运行
-
-
四、setDaemon()
- 作用:主线程A中,创建了子线程B,并且在主线程A中调用了
B.setDaemon(True)
,这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出。这基本和join是相反的。 - 特别注意:
- 必须在
start()
方法调用之前设置 - 要配置为守护线程,要给参数
.setDaemon(True)
- 必须在
五、lock()
-
用lock来进行线程同步
-
lock的方法
-
定义一个lock:
lock = threading.Lock()
-
.acquire()
:请求临界资源/上锁,多个线程中,同时只能有一个lock成功,其他进程被阻塞(加入阻塞队列,不会忙等) -
.release()
:释放临界资源/解锁。这会唤醒一个阻塞队列中的线程
-
-
不用lock的示例
import threading import time from queue import Queue def job1(): global A for i in range(10): A += 1 print("job1",A) def job2(): global A for i in range(10): A += 10 print("job2",A) if __name__ == "__main__": A = 0 #临界资源 t1 = threading.Thread(target=job1) t2 = threading.Thread(target=job2) t1.start() t2.start() t1.join() t2.join() '''运行结果 job1 1 job1 2 job2 12 job2 23 job1 13 job1 34 job1 35 job1 36 job1 37 job2 33 job1 38 job2 48 job1 49 job2 59 job1 60 job2 70 job2 80 job2 90 job2 100 job2 110 '''
- 没有进行进程同步处理,两个线程一起处理全局变量A
-
使用互斥锁改写
import threading import time from queue import Queue def job1(): global A,lock lock.acquire() #请求资源 for i in range(10): A += 1 print("job1",A) lock.release() #释放资源 def job2(): global A,lock lock.acquire() #请求资源 for i in range(10): A += 10 print("job2",A) lock.release() #释放资源 if __name__ == "__main__": lock = threading.Lock() A = 0 #临界资源 t1 = threading.Thread(target=job1) t2 = threading.Thread(target=job2) t1.start() t2.start() t1.join() t2.join() ''' job1 1 job1 2 job1 3 job1 4 job1 5 job1 6 job1 7 job1 8 job1 9 job1 10 job2 20 job2 30 job2 40 job2 50 job2 60 job2 70 job2 80 job2 90 job2 100 job2 110 '''
六、用queue返回值
-
子线程是没有返回值的,可以把每个线程结果放在一个队列里,在主线程拿出结果
-
Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步
-
Queue 模块中的常用方法:
-
Queue.qsize()
返回队列的大小 -
Queue.empty()
如果队列为空,返回True,反之False -
Queue.full()
如果队列满了,返回True,反之False,Queue.full
与maxsize
大小对应 -
Queue.get([block[, timeout]])
获取队列,timeout等待时间 -
Queue.get_nowait()
相当Queue.get(False) -
Queue.put(item)
写入队列,timeout等待时间 -
Queue.put_nowait(item)
相当Queue.put(item, False) -
Queue.task_done()
在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号 -
Queue.join()
实际上意味着等到队列为空,再执行别的操作
-
-
示例
import threading import time from queue import Queue #子线程起始函数,对列表里每个元素求平方,通过队列返回 def job(L,q): for i in range(len(L)): L[i] = L[i]**2 #每个元素平方 #要返回的值放在队列里返回 q.put(L) def main(): q = Queue() threads = [] data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]] for i in range(4): t = threading.Thread(target=job,args = (data[i],q)) t.start() threads.append(t) for thread in threads: thread.join() res = [] for i in range(4): res.append(q.get()) print(res) if __name__ == "__main__": main()
-
这个示例线程应该不安全,可能多个线程同时入队(不确定put是不是原语)
-
如果在入队的地方多入队几次,加延时,可以看到进程的切换
-
-
示例2
import queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print ("开启线程:" + self.name) process_data(self.name, self.q) print ("退出线程:" + self.name) #处理数据 def process_data(threadName, q): #没有退出标志 while not exitFlag: #请求访问队列,上锁 queueLock.acquire() #如果队列非空,出队一个打印 if not workQueue.empty(): data = q.get() queueLock.release() print ("%s processing %s" % (threadName, data)) #如果队列是空的,直接释放队列 else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1 # 创建新线程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 请求并填充队列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待队列清空 while not workQueue.empty(): pass # 通知线程是时候退出 exitFlag = 1 # 等待所有线程完成 for t in threads: t.join() print ("退出主线程") ''' 开启线程:Thread-1 开启线程:Thread-2 开启线程:Thread-3 Thread-3 processing One Thread-1 processing Two Thread-2 processing Three Thread-3 processing Four Thread-1 processing Five 退出线程:Thread-2 退出线程:Thread-3 退出线程:T '''
- 这里直接从 threading.Thread 继承创建一个新的子类,并实例化后调用 start() 方法启动新线程,即它调用了线程的 run() 方法
推荐阅读
-
python中的模块
-
Python使用dis模块把Python反编译为字节码的用法详解
-
python基础教程之数字处理(math)模块详解
-
Python中的常用的模块介绍
-
详解Python的collections模块中的deque双端队列结构
-
python str模块
-
Python使用SocketServer模块编写基本服务器程序的教程
-
Python控制多进程与多线程并发数总结
-
python用模块zlib压缩与解压字符串和文件的方法
-
《Python 3》--三引号、math模块、cmath模块、日期和时间、转义字符、字符串运算符、字符串格式化、函数、全局变量和局部变量、匿名函数(lambda))