欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【2020Python修炼记】python并发编程(五)多线程-应用部分

程序员文章站 2022-07-09 19:09:08
【目录】 一、 threading模块介绍 二 、开启线程的两种方式 三 、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别 四、 线程相关的其他方法 五、守护线程 六、Python GIL(Global Interpreter Lock) 八、同步锁 九、死锁现象与递归锁 十、线程que ......

【目录】

一、 threading模块介绍

二 、开启线程的两种方式

三 、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

四、 线程相关的其他方法

五、守护线程 

六、python gil(global interpreter lock)

八、同步锁

九、死锁现象与递归锁

十、线程queue

 

一、 threading模块介绍

进程的multiprocess模块,完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,此处省略好多字。。

multiprocess模块用法回顾:

二 、开启线程的两种方式

1、实例化类创建对象

#方式一
from threading import thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=thread(target=sayhi,args=('egon',))
    t.start()
    print('主线程')

 

2、类的继承

#方式二
from threading import thread
import time
class sayhi(thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = sayhi('egon')
    t.start()
    print('主线程')

 

三 、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

1、比较开启速度

from threading import thread
from multiprocessing import process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

2、pid

from threading import thread
from multiprocessing import process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=thread(target=work)
    t2=thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=process(target=work)
    p2=process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())

3、同一进程内的线程之间共享进程内的数据

from  threading import thread
from multiprocessing import process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100


    n=1
    t=thread(target=work)
    t.start()
    t.join()
    print('主',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据

 

四、 线程相关的其他方法

1、thread实例对象的方法:

1、isalive(): 返回线程是否活动的,存活的。
2、getname(): 返回线程名。
3、setname(): 设置线程名。

2、threading模块提供的一些方法:

1、threading.currentthread()  

返回当前的线程变量。

2、threading.enumerate() 

返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

3、threading.activecount()

返回正在运行的线程数量,与 len(threading.enumerate()) 有相同的结果。

from threading import thread
import threading
from multiprocessing import process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getname())


if __name__ == '__main__':
    #在主进程下开启线程
    t=thread(target=work)
    t.start()

    print(threading.current_thread().getname())
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')

    '''
    打印结果:
    mainthread
    <_mainthread(mainthread, started 140735268892672)>
    [<_mainthread(mainthread, started 140735268892672)>, <thread(thread-1, started 123145307557888)>]
    主线程/主进程
    thread-1
    '''

3、join()的使用——主线程等待子线程结束

from threading import thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主线程')
    print(t.is_alive())
    '''
    egon say hello
    主线程
    false
    '''

 

五、守护线程 

1、强调

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

 

需要强调的是:运行完毕(真正的任务已经做完,但还没有下班) 并非 终止运行(任务已经完成,收拾好东西下班了)

#1.对主进程来说,运行完毕指的是主进程代码运行完毕

#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

详细解释:

#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束运行。

#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。
因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

2、两颗栗子

from threading import thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=thread(target=sayhi,args=('egon',))
    t.setdaemon(true) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
    '''
    主线程
    true
    '''
from threading import thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=thread(target=foo)
t2=thread(target=bar)

t1.daemon=true
t1.start()
t2.start()
print("main-------")

 

六、python gil (global interpreter lock)

1、强调

# 在cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势.

# gil 并不是python的特性,它是在实现python解释器(cpython)时所引入的一个概念,

    即 gil 是 python解释器(cpython)的特性。

2、gil

gil 本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。有了gil的存在,同一时刻同一进程中只有一个线程被执行

可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

参考阅读:

 

八、同步锁

1、三个需要注意的点:

#1.gil & lock

线程抢的是gil锁,gil锁相当于执行权限,拿到执行权限后才能拿到互斥锁lock

其他线程也可以抢到gil,但如果发现lock仍然没有被释放则阻塞,即便是拿到执行权限gil也要立刻交出来。

(gil锁 和 互斥锁lock 都抢到 ,才是王道)

#2. join() & lock 

join是等待所有,即整体串行,而只是锁住修改共享数据的部分,即部分串行

要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

#3. gil与互斥锁的经典分析,见本小节末---必看

 

2、python已经有一个gil来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 

一个共识 —— 的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

一个结论 —— 保护不同的数据就应该加不同的锁

明朗答案 —— gil 与lock是两把锁,保护的数据不一样,

      前者 gil 是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),

      后者lock 保护用户自己开发的应用程序的数据,很明显gil不负责这件事,只能用户自定义加锁处理,即lock

 

 3、gil锁  vs  互斥锁lock ——代码栗子

锁通常被用来实现对共享资源的同步访问。

为每一个共享资源创建一个lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象

(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

import threading

r=threading.lock()

r.acquire()
'''
对公共数据的操作
'''
r.release()
from threading import thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果可能为99
from threading import thread,lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=lock()
    n=100
    l=[]
    for i in range(100):
        p=thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全

gil锁与互斥锁综合分析:
#1.100个线程去抢gil锁,即抢执行权限

#2. 肯定有一个线程先抢到gil(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()

#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到gil,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,*交出执行权限,即释放gil

#4.直到线程1重新抢到gil,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

4、让并发变成串行——互斥锁 和 join()的区别

join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,

要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

#不加锁:并发执行,速度快,数据不安全
from threading import current_thread,thread,lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getname())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
thread-1 is running
thread-2 is running
......
thread-100 is running
主:0.5216062068939209 n:99
'''


#不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,thread,lock
import os,time
def task():
    #未加锁的代码并发运行
    time.sleep(3)
    print('%s start to run' %current_thread().getname())
    global n
    #加锁的代码串行运行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
thread-1 is running
thread-2 is running
......
thread-100 is running
主:53.294203758239746 n:0
'''

#有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊
#没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是
#start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
#单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.
from threading import current_thread,thread,lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getname())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=lock()
    start_time=time.time()
    for i in range(100):
        t=thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
thread-1 start to run
thread-2 start to run
......
thread-100 start to run
主:350.6937336921692 n:0 #耗时是多么的恐怖
'''

 

九、死锁现象与递归锁——进程和线程都有死锁现象与递归锁

所谓死锁:

是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁:

from threading import thread,lock
import time
mutexa=lock()
mutexb=lock()

class mythread(thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexa.acquire()
        print('\033[41m%s 拿到a锁\033[0m' %self.name)

        mutexb.acquire()
        print('\033[42m%s 拿到b锁\033[0m' %self.name)
        mutexb.release()

        mutexa.release()

    def func2(self):
        mutexb.acquire()
        print('\033[43m%s 拿到b锁\033[0m' %self.name)
        time.sleep(2)

        mutexa.acquire()
        print('\033[44m%s 拿到a锁\033[0m' %self.name)
        mutexa.release()

        mutexb.release()

if __name__ == '__main__':
    for i in range(10):
        t=mythread()
        t.start()

'''
thread-1 拿到a锁
thread-1 拿到b锁
thread-1 拿到b锁
thread-2 拿到a锁
然后就卡住,死锁了
'''

解决方法——递归锁

在python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁rlock。

这个rlock内部维护着一个lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用rlock代替lock,则不会发生死锁:

mutexa=mutexb=threading.rlock() 

#一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

 

十、线程 的类 queue

1、线程queue

queue队列 :使用 import queue,用法与进程queue一样

     (注意区分——进程的queue,进程的队列首字母为大写q,线程的为小写q

                   进程中的导入方法:from multiprocessing import process,queue)

2、主要用法

class queue.queue(maxsize=0  #先进先出

import queue

q=queue.queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

 

class queue.lifoqueue(maxsize=0   #last in fisrt out 

import queue

q=queue.lifoqueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

 

class queue.priorityqueue(maxsize=0   #存储数据时可设置优先级的队列

import queue

q=queue.priorityqueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

 

参考资料: