欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python 多线程threading模块

程序员文章站 2022-05-02 12:06:00
...

一、多线程基本概念

  1. 多线程类似于同时执行多个不同程序,多线程运行有如下优点:

    • 使用线程可以把占据长时间的程序中的任务放到后台去处理。
    • 用户界面可以更加吸引人,比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
    • 程序的运行速度可能加快。
    • 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
  2. 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在进程中,由应用程序提供多个线程执行控制。

  3. 每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。

  4. 指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。

    1. 线程可以被抢占(中断)。
    2. 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) – 这就是线程的退让。
  5. 线程可以分为:

    1. 内核线程:由操作系统内核创建和撤销。
    2. 用户线程:不需要内核支持而在用户程序中实现的线程。
  6. 多线程不一定高效

    • 多线程不一定运行在多个核心上,很可能是一个核心在不停的做上下文切换
    • 如果不用多核形式的 ”硬件并发“ ,而采用上下文切换式的 ”软件并发“,并不能提升太多效率。反而可能使效率下降
  7. 使用并发的原因

    • 任务拆分:在编写软件的时候,将相关的代码放在一起,将无关的代码分开,这是一个好主意,这样能够让程序更加容易理解和测试。将程序划分成不同的任务,每个线程执行一个任务或者多个任务,可以将整个程序的逻辑变得更加简单。

    • 提高性能:在两种情况下,并发能够提高性能。

      • 任务并行(task parallelism):将一个单个任务分成若干个部分各自并行运行,从而降低运行时间。

        • 如果一个部分的执行需要使用到另一个任务的执行结果,这个时候并不能很好的并行完成。
        • 如果不能多个线程分配到不同的处理机执行,而是在一个处理机上进行上下文切换,也不能真正提升性能
        • 在一些场合下,虽然多线程不能提升性能,但是可以带来更好的用户体验。比如制作一个下载器软件,如果下载过程的数据传输和UI控制在一个线程中,当下载进行时UI界面就会卡死,用户不能点击取消或者暂停之类的按钮,只能等传输完成才能重新控制UI
      • 数据并行(data parallelism):每个线程在不同的数据部分上执行相同的操作。

二、threading模块的基本使用

import threading

#子线程起始函数
def thread_job():
    print("this is an added Thread,number is {}".format(threading.current_thread()))

def main():
    print(threading.active_count())	#当前**的线程数
    print(threading.enumerate())    #打印当前所有线程信息
    print(threading.currentThread())#打印正在执行这行代码的线程

    added_thread = threading.Thread(target = thread_job)
    added_thread.start()    #启动自定义线程

if __name__ == "__main__":
	main()

(1)threading模块的常用方法

  • threading.active_count():返回当前**的线程数

  • threading.enumerate():返回当前**的线程信息

  • threading.currentThread():返回正在执行这行代码的线对象

  • threading.Thread(target = 函数名):按指定的起始函数创建一个子线程(Thread类对象),返回线程对象

  • added_thread.start():启动added_thread线程

(2)Thread类的常用方法

使用threading.Thread创建子线程,它本质上是一个Thread类对象,它提供了以下常用方法

  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这句代码阻塞调用线程直至其子线程被调用中止
    1. 子线程正常退出
    2. 子线程抛出未处理的异常
    3. 主线程join语句中可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名

三、join()

  • 格式:join([time]):这将阻塞调用线程,直至被调用线程的join() 方法被调用中止

  • 示例

    • 原程序

      import threading
      import time
      
      #子线程起始函数
      def thread_job():
          print("T1 strat\n")
          for i in range(10):
              time.sleep(0.1)	#线程休眠0.1s
          print("T1 finish\n") 
      
      def main():
          added_thread = threading.Thread(target = thread_job,name = "T1")
          added_thread.start()    #启动自定义线程
          print("all done\n")
      
      if __name__ == "__main__":
      	main()
          
      '''输出
      T1 strat
      
      all done
      
      T1 finish
      '''
      
    • time.sleep(n):线程休眠n秒

    • 这个程序运行中,先输出T1 strat和all done,过一阵才输出T1 finish

      • 这里主线程先结束了,但是子线程过了一会才结束
      • 如果类比C++多线程,可以认为python里的线程默认是detach
    • 下面用 join() 修改这个程序(只加了一句join代码)

      import threading
      import time
      
      #子线程起始函数
      def thread_job():
          print("T1 strat\n")
          for i in range(10):
              time.sleep(0.1)	#线程休眠0.1s
          print("T1 finish\n") 
      
      def main():
          added_thread = threading.Thread(target = thread_job,name = "T1")
          added_thread.start()    #启动自定义线程
          added_thread.join()    #强制主线程等子线程结束才能向下执行
          print("all done\n")
      
      if __name__ == "__main__":
      	main()
          
      '''输出
      T1 strat
      
      T1 finish
      
      all done
      '''
      
      
      • added_thread.join():对一个线程调用.join()方法,要求此线程的程序在这里阻塞,直到它的子线程结束才能继续向下运行

四、setDaemon()

  • 作用:主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(True),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出。这基本和join是相反的。
  • 特别注意:
    • 必须在start() 方法调用之前设置
    • 要配置为守护线程,要给参数.setDaemon(True)

五、lock()

  • 用lock来进行线程同步

  • lock的方法

    • 定义一个lock:lock = threading.Lock()

    • .acquire():请求临界资源/上锁,多个线程中,同时只能有一个lock成功,其他进程被阻塞(加入阻塞队列,不会忙等)

    • .release():释放临界资源/解锁。这会唤醒一个阻塞队列中的线程

  • 不用lock的示例

    import threading
    import time
    from queue import Queue
    
    def job1():
        global A
        for i in range(10):
            A += 1
            print("job1",A)        
    
    def job2():
        global A
        for i in range(10):
            A += 10
            print("job2",A)
    
    if __name__ == "__main__":
        A = 0   #临界资源
        
        t1 = threading.Thread(target=job1)
        t2 = threading.Thread(target=job2)
    
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
        
    '''运行结果
    job1 1
    job1 2
    job2 12
    job2 23
    job1 13
    job1 34
    job1 35
    job1 36
    job1 37
    job2 33
    job1 38
    job2 48
    job1 49
    job2 59
    job1 60
    job2 70
    job2 80
    job2 90
    job2 100
    job2 110
    '''
    
    • 没有进行进程同步处理,两个线程一起处理全局变量A
  • 使用互斥锁改写

    import threading
    import time
    from queue import Queue
    
    def job1():
        global A,lock
       
        lock.acquire()	#请求资源
        for i in range(10):
            A += 1
            print("job1",A) 
        lock.release()	#释放资源
    
    def job2():
        global A,lock
    
        lock.acquire()	#请求资源
        for i in range(10):
            A += 10
            print("job2",A)
        lock.release()	#释放资源
    
    if __name__ == "__main__":
        lock = threading.Lock()
        A = 0   #临界资源
        
        t1 = threading.Thread(target=job1)
        t2 = threading.Thread(target=job2)
    
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    
    '''
    job1 1
    job1 2
    job1 3
    job1 4
    job1 5
    job1 6
    job1 7
    job1 8
    job1 9
    job1 10
    job2 20
    job2 30
    job2 40
    job2 50
    job2 60
    job2 70
    job2 80
    job2 90
    job2 100
    job2 110
    '''
    

六、用queue返回值

  • 子线程是没有返回值的,可以把每个线程结果放在一个队列里,在主线程拿出结果

  • Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步

  • Queue 模块中的常用方法:

    • Queue.qsize() 返回队列的大小
    • Queue.empty() 如果队列为空,返回True,反之False
    • Queue.full() 如果队列满了,返回True,反之False,Queue.fullmaxsize 大小对应
    • Queue.get([block[, timeout]])获取队列,timeout等待时间
    • Queue.get_nowait() 相当Queue.get(False)
    • Queue.put(item) 写入队列,timeout等待时间
    • Queue.put_nowait(item) 相当Queue.put(item, False)
    • Queue.task_done()在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    • Queue.join() 实际上意味着等到队列为空,再执行别的操作
  • 示例

    import threading
    import time
    from queue import Queue
    
    #子线程起始函数,对列表里每个元素求平方,通过队列返回
    def job(L,q):
        for i in range(len(L)):
            L[i] = L[i]**2  #每个元素平方
        
        #要返回的值放在队列里返回
        q.put(L)		
    
    def main():
        q = Queue()
        threads = []
        
        data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
        
        for i in range(4):
            t = threading.Thread(target=job,args = (data[i],q))
            t.start()
            threads.append(t)
    
        for thread in threads:
            thread.join()
    
        res = []
        for i in range(4):
            res.append(q.get())
        
        print(res)
        
    
    if __name__ == "__main__":
        main()
    
    • 这个示例线程应该不安全,可能多个线程同时入队(不确定put是不是原语)

    • 如果在入队的地方多入队几次,加延时,可以看到进程的切换

  • 示例2

    import queue
    import threading
    import time
    
    exitFlag = 0
    
    class myThread (threading.Thread):
        def __init__(self, threadID, name, q):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.q = q
        def run(self):
            print ("开启线程:" + self.name)
            process_data(self.name, self.q)
            print ("退出线程:" + self.name)
    
    #处理数据
    def process_data(threadName, q):
        #没有退出标志
        while not exitFlag:
            #请求访问队列,上锁
            queueLock.acquire()
            #如果队列非空,出队一个打印
            if not workQueue.empty():
                data = q.get()
                queueLock.release()
                print ("%s processing %s" % (threadName, data))
            #如果队列是空的,直接释放队列
            else:
                queueLock.release()
            time.sleep(1)
    
    threadList = ["Thread-1", "Thread-2", "Thread-3"]
    nameList = ["One", "Two", "Three", "Four", "Five"]
    queueLock = threading.Lock()
    workQueue = queue.Queue(10)
    threads = []
    threadID = 1
    
    # 创建新线程
    for tName in threadList:
        thread = myThread(threadID, tName, workQueue)
        thread.start()
        threads.append(thread)
        threadID += 1
    
    # 请求并填充队列
    queueLock.acquire()
    for word in nameList:
        workQueue.put(word)
    queueLock.release()
    
    # 等待队列清空
    while not workQueue.empty():
        pass
    
    # 通知线程是时候退出
    exitFlag = 1
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    print ("退出主线程")
    
    '''
    开启线程:Thread-1
    开启线程:Thread-2
    开启线程:Thread-3
    Thread-3 processing One
    Thread-1 processing Two
    Thread-2 processing Three
    Thread-3 processing Four
    Thread-1 processing Five
    退出线程:Thread-2
    退出线程:Thread-3
    退出线程:T
    '''
    
    • 这里直接从 threading.Thread 继承创建一个新的子类,并实例化后调用 start() 方法启动新线程,即它调用了线程的 run() 方法