欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python之多线程

程序员文章站 2023-12-24 16:24:57
...

多线程

使用,基本方法

传统方式编写的程序是顺序执行的,当我们想要同时执行两个任务时,加入一个任务花费的时间是10秒,另外一个任务需要花费13秒的时间,当两个任务执行完成的时间需要23秒,当我们引入多线程后,完成这两个任务花费的时间会是13秒,也就是说节省了10秒的时间。比如下面的例子,当这两函数执行完成后花费了23秒的时间,但是使用多线程之后仅花费13秒的时间。

def music():
    print('start listen--->', time.ctime())
    time.sleep(10)
    print('end listen----->', time.ctime())


def game():
    print('start game--->', time.ctime())
    time.sleep(13)
    print('end game----->', time.ctime())

使用多线程后的写法之一,通过参数实现,这样我们就可以实现简单的多线程了,这样就可以帮助我们同时执行多个任务了

t1 = threading.Thread(target=music)
t2 = threading.Thread(target=game)
t1.start()
t2.start()

使用继承实现多线程,里面的run()必须要重写,因为多线程执行的函数就是在这里面

class MyThread(threading.Thread):

    def run(self):  # 定义每个线程要运行的函数
        print('----->')
        self.music() # 要做的任务

    def music(self):
        print('start listen--->', time.ctime())
        time.sleep(10)
        print('end listen----->', time.ctime())

通过函数的方式实现多线程解释,构造方法:

Thread(group=None, target=None, name=None, args=(), kwargs={})

group: 线程组,目前还没有实现,库引用中提示必须是None;

target: 要执行的方法;

name: 线程名;

args/kwargs: 要传入方法的参数,就是执行方法需要的参数。

#threading的方法
# 返回当前的线程变量。
print(threading.currentThread())
# 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
print(threading.enumerate())
# 返回当前活着的线程数量
print(threading.activeCount())

 

实例方法: 
  isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。 
  get/setName(name): 获取/设置线程名。 

  start():  线程准备就绪,等待CPU调度
  is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
        如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
        如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  start(): 启动线程。 
  join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
'''
join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,
即使设置了setDeamon(True)主线程依然要等待子线程结束。
'''
'''
serDeamon(False)(默认)前台线程,
主线程执行过程中,前台线程也在进行,
主线程执行完毕后,等待前台线程也执行完成后,主线程停止。
'''
'''
serDeamon(True)后台线程,
主线程执行过程中,后台线程也在进行,
主线程执行完毕后,后台线程不论成功与否,主线程均停止。
'''

 join()方法解释:



def music():
    print('start listen--->', time.ctime())
    time.sleep(3)
    print('end listen----->', time.ctime())


def game():
    print('start game--->', time.ctime())
    time.sleep(5)
    print('end game----->', time.ctime())


if __name__ == '__main__':
    t1 = threading.Thread(target=music)
    t2 = threading.Thread(target=game)
    t1.start()
    t2.start()
    # t1.join()
    # t2.join()
    print('end all')

'''
运行结果:
start listen---> Fri Apr  5 15:57:22 2019
start game---> Fri Apr  5 15:57:22 2019
end all
end listen-----> Fri Apr  5 15:57:25 2019
end game-----> Fri Apr  5 15:57:27 2019
'''

上面的这个例子中没有使用join()方法,可以看到输出的结果,end all在两个线程运行结束之前便打印出来,这意味着线程开始后,程序便顺序向下执行。那么将上面带代码中的join()的注释打开,输出的结果便是

'''
运行结果:
start listen---> Fri Apr  5 16:03:37 2019
start game---> Fri Apr  5 16:03:37 2019
end listen-----> Fri Apr  5 16:03:40 2019
end game-----> Fri Apr  5 16:03:42 2019
end all
'''

这意味着,将这个线程阻塞到主线程之中了,其中join()函数可以设置timeout参数,当到达timeout时间后,不再阻塞。

is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)

还是上一个例子,将代码改成这样

if __name__ == '__main__':
    t1 = threading.Thread(target=music)
    t2 = threading.Thread(target=game)
    '''
    如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    '''
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    print('end all')

'''
运行结果:
start listen---> Fri Apr  5 16:07:20 2019
start game---> Fri Apr  5 16:07:20 2019
end all
'''

将setDaemon()的参数设置为false后,运行的结果与未设置join效果相同。

先看下面的例子,看一下结果是什么

import threading
import time


def sub():
    global num
    temp = num
    time.sleep(0.01)
    num = temp - 1


num = 100

l = []

for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)

for t in l:
    t.join()

print(num)

是不是猜的结果是0?错了,结果不可能是0的,因为我们使用了100个线程,在某个时刻,第一个线程拿到num=100这个变量,然后time.sleep(0.01)时cpu开始切换到下一个线程,此时第二个线程拿到的num也是100,然后又开始执行time.sleep(0.01),cpu切换到下一个线程就这么一直下去,然后在某个时刻,CPU又切换到第一个线程了,开始执行temp-1了,此时num是99,然后切换到线程2,这时线程2拿到的num还是之前的100,执行减操作后num是99,这时候对于这个num的值就不会出现预期的值了。

解决这种现象的方法就是加锁,对于某种公共资源采用加锁的方式来避免这种现象,例如下面改写后的例子

import threading
import time


def sub():
    global num
    lock.acquire() # 加锁,加锁之后告诉CPU,不许切换
    temp = num
    time.sleep(0.01)
    num = temp - 1
    lock.release() # 释放锁,告诉CPU,可以切换了


num = 100

l = []
# 获取一把锁
lock = threading.Lock()

for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)

for t in l:
    t.join()

print(num)

什么是锁?

由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。

Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。

死锁


import threading
import time

lockA = threading.Lock()
lockB = threading.Lock()


class mythread(threading.Thread):
    def run(self):
        self.doA()
        self.doB()

        def doA(self):
        lockA.acquire()
        print(self.getName(), 'doA()', 'getA', time.ctime())
        time.sleep(1)
        lockB.acquire()
        print(self.getName(), 'doA()', 'getB', time.ctime())
        time.sleep(2)
        lockB.release()
        print(self.getName(), 'doA()', 'releaseB', time.ctime())
        lockA.release()
        print(self.getName(), 'doA()', 'releaseA', time.ctime())

    def doB(self):
        lockB.acquire()
        print(self.getName(), 'doB()', 'getB', time.ctime())
        time.sleep(1)
        lockA.acquire()
        print(self.getName(), 'doB()', 'getA', time.ctime())
        time.sleep(2)
        lockA.release()
        print(self.getName(), 'doB()', 'releaseA', time.ctime())
        lockB.release()
        print(self.getName(), 'doB()', 'releaseB', time.ctime())



if __name__ == '__main__':
    tl = []
    for i in range(5):
        t = mythread()
        tl.append(t)
    for t in tl:
        t.start()

'''
运行结果分析:
5个线程同时抢占A锁,此时Thread-1抢到了
Thread-1 doA() getA Fri Apr  5 18:01:55 2019
Thread-1 doA() getB Fri Apr  5 18:01:56 2019
Thread-1 doA() releaseB Fri Apr  5 18:01:58 2019
Thread-1 doA() releaseA Fri Apr  5 18:01:58 2019

Thread-1执行完doA()后,释放了BA锁,然后Thread-2抢到了A锁,Thread-2睡眠
Thread-2 doA() getA Fri Apr  5 18:01:58 2019

在Thread-2抢到A锁后睡眠的时间内,Thread-1执行doB()抢到了B锁,然后睡眠
Thread-1的睡眠时间结束,获得B锁
Thread-1 doB() getB Fri Apr  5 18:01:58 2019

这时Thread-2睡眠时间结束,准备获得B锁,但是由于Thread-1抢到了B锁但是还没释放
会卡到这里等待Thread-1释放B锁,等待Thread-1睡眠时间结束,然后Thread-1准备获得A锁,
但是此时A锁被Thread-2获得,所以Thread-1又在等待Thread-2把A锁释放
然后Thread-1和Thread-2都在等待对方释放各自要抢占的锁,就这样导致了死锁
PS:可以得出Lock属于全局,每个线程均可抢占

'''

 如何解决这种现象,那么可以使用递归锁的方式来解决

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。
RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。
拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,
每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

 改成下面这样即可

r_lock = threading.RLock()


class mythread(threading.Thread):
    def run(self):
        self.doA()
        self.doB()

    def doA(self):
        r_lock.acquire()
        time.sleep(1)
        r_lock.acquire()
        time.sleep(2)
        r_lock.release()
        r_lock.release()

    def doB(self):
        r_lock.acquire()
        time.sleep(1)
        r_lock.acquire()
        time.sleep(2)
        r_lock.release()
        r_lock.release()

Event( 事件)

Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。
Event内置了一个初始为False的标志,
当调用set()时设为True,调用clear()时重置为 False。
wait()将阻塞线程至等待阻塞状态。

Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

构造方法:
Event()

实例方法:
isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

import threading, time



class Signal(threading.Thread):
    def run(self):
        print('信号灯:现在是红灯')
        # print(event.isSet())  # 是否已经设置
        time.sleep(3)
        event.set()
        print('信号灯:现在是绿灯了')
        # print(event.isSet())
        event.set()


class Car(threading.Thread):
    def run(self):
        print('车:红灯了,我不能走了')
        event.wait()
        print('车:绿灯了,我可以过了')
        event.clear()
        event.wait()


if __name__ == "__main__":
    event = threading.Event()
    threads = []
    for i in range(5):
        threads.append(Car())
    threads.insert(0, Signal())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

 信号量

信号量用来控制线程并发数的,Semaphore管理一个内置的计数 器,
每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,
直到其他线程调用release()。(类似于停车位的概念)

import threading, time



class MyThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()


if __name__ == "__main__":
    semaphore = threading.Semaphore(5)  # 每次可以运行五个
    thrs = []
    for i in range(100):
        thrs.append(MyThread())
    for t in thrs:
        t.start()

多线程之队列

q.put(item, block=True, timeout=None) 放入队列中,如果q设置了大小,当队列满时,将会阻塞
block为True时,将会阻塞,如果设置timeout,经过timeout秒后,如果队列依旧是满的,则会报错

q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,block是否阻塞,timeout等待时间

q.get_nowait() 相当q.get(False)
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

对于优先级队列
q.put([1232, 121])[优先级,值]

import queue

q = queue.Queue(5)  # 先进先出队列
q1 = queue.LifoQueue()  # 后进先出
q2 = queue.PriorityQueue()  # 优先级队列级别越低越先出来

q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
while not q.empty():
    print(q.get())


q1.put(1)
q1.put(2)
q1.put(3)
q1.put(4)
while not q1.empty():
    print(q1.get())


q2.put([1232, 121])
q2.put([4, 124])
q2.put([3, 123])
q2.put([2, 122])
while not q2.empty():
    print(q2.get())

 

上一篇:

下一篇: