进程与线程
生产者消费者模型
主要用于解耦
from multiprocessing import queue #队列是安全的,不用加锁. q = queue(num) num : 队列的最大长度 q.get()#阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put()#阻塞,如果可以继续往队列中放数据就直接放,不能放就阻塞等待 q.get_nowait()#不阻塞,如果有数据就直接获取,没有数据就报错 q.put_nowait()#不阻塞, 如果能继续往队列中放数据,就直接放,不能放就报错 q = queue(3) q.put(1) q.put('abc') q.put([4,5,6]) print('此时队列已不能再放入了') q.put('呵呵')#此处阻塞等待空位置放入 #q.putnowait('呵呵')#队列已满,不再等待,直接报错 print('此处不会被打印')
print(q.get())#先进先出,先取出 1
print(q.get())
print(q.get())
#print(q.get())#队列为空,取不出会阻塞等待新数据取出
print(q.getnowait())#不再等待直接报错
借助队列实现生产者消费者模型 (队列(first in first out 简称 : fifo) : 先进先出 )
from multiprocessing import queue ,process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) else:#当收到none时,结束获取,退出程序 break def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) q.put(none)#生产者停止生产的标识 if __name__ == '__main__' : q = queue(5)#规定队列最大为5 pro = process(target=producer, args=(q,'版本一')) con = process(target=consumer, args=(q,'小潘')) pro.start() con.start() #把成产表示符放入父进程 from multiprocessing import queue ,process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) else:#当收到none时,结束获取,退出程序 break def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) if __name__ == '__main__' : q = queue(5)#规定队列最大为5 pro = process(target=producer, args=(q,'版本一')) con = process(target=consumer, args=(q,'小潘')) pro.start() con.start() pro.join() q.put(none)#生产者停止生产的标识
#多个生产者消费者
from multiprocessing import queue ,process
def consumer(q,name):
while 1:
pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
if pro_info :
print('%s拿走了%s' % (name,pro_info))
else:#当收到none时,结束获取,退出程序
break
def producer(q,product):
for i in range(20):
pro_info = product + '的成品%s号' % str(i)
q.put(pro_info)
if __name__ == '__main__' :
q = queue(5)#规定队列最大为5
pro1 = process(target=producer, args=(q,'版本一'))
pro2 = process(target=producer, args=(q, '版本二'))
pro3 = process(target=producer, args=(q, '版本三'))
con1 = process(target=consumer, args=(q,'小潘'))
con2 = process(target=consumer, args=(q, '李四'))
li = [pro1,pro2,pro3,con1,con2]
[i.start() for i in li]
pro1.join()
pro2.join()
pro3.join()
q.put(none)#生产者停止生产的标识
q.put(none)
joinablequeue模块
from multiprocessing import joinablequeue #继承了multiprocessing.queue 类,新添加了join(),q.task_done() q = joinablequeue() q.join()#等待q.task_done的返回结果 q.task_done()#用于消费者,表示每消费队列中一个数据,就给join返回一个标识 from multiprocessing import joinablequeue ,process def consumer(q,name): while 1: pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取 if pro_info : print('%s拿走了%s' % (name,pro_info)) q.task_done()#从队列中,每拿走一个数据,就传给join发送一个标识,共十个数据,则十个标识 def producer(q,product): for i in range(10): pro_info = product + '的成品%s号' % str(i) q.put(pro_info) q.join()#记录生产了20个数据在队列中,此时阻塞等待着对列中的所有数据均被拿取 if __name__ == '__main__' : q = joinablequeue(5)#规定队列最大为5 pro = process(target=producer, args=(q,'版本一')) con = process(target=consumer, args=(q,'小潘')) con.daemon = true#把消费者进程设为守护进程,由于主进程等待成产者进程,生产者进程等待消费者进程, # 所以把消费者进程设为守护进程,主进程代码执行完毕,消费者进程结束,则程序结束. pro.start() con.start() pro.join()#等待生产者进程结束
管道
管道是不安全的 , 一般单进程不要用管道
用于多进程之间通信的一种方式
如果在单进程中使用管道,那么就是con1收数据,con2发数据 ; 如果是con1发数据 , con2收数据
如果是多进程中使用管道,那么必须是父进程使用con1收,子进程就必须使用con2发 ;
父进程用con1发 , 子进程必须用con2收 ;
父进程用con2收 , 子进程必须用con1发 ;
父进程用con2收 , 子进程必须用con1收
管道中eoferror错误,是指父进程中如果关闭了发送端,子进程还继续接收数据,就会引
发eoferror错误.
from multiprocessing import pipe,process #单进程下的管道 # con1 , con2 = pipe() # # con1.send('adc') # print(con2.recv()) # con2.send(123) # print(con1.recv()) #多进程 def func(con): con1,con2 = con con1.close() print(con2.recv()) con2.send('主进程con2收') #print(con1.recv())#在同一进程中,con1和con2不能同时开启,否则程序不能关闭 if __name__ == '__main__': con1 , con2 = pipe() p = process(target=func,args=((con1,con2),)) p.start() con2.close() con1.send('子进程con2收')#con1发送,必须是con2接收 print(con1.recv()) def func(con): con1,con2 = con con1.close() con2.send('主进程con2收') while 1 : try : print(con2.recv())#如果父进程不关闭con1管道,则子进程一直阻塞在此处等待接收,报错 except eoferror :#try 一下当报该类型错误时自动执行下面程序 con2.close() break if __name__ == '__main__': con1 , con2 = pipe() p = process(target=func,args=((con1,con2),)) p.start() con2.close() print(con1.recv()) for i in range(10): con1.send('子进程con2收%s' % i)#con1发送,必须是con2接收 con1.close()#发送完毕后,关闭管道
进程之间的共享内存
from multiprocessing import manager , process # m = manager() # num = m.dict({'键':'值'})#数据可以是字典或者其他形式 # num = m.list([1,2,3]) def func(num): num[0] -= 1 print('子进程中的num的值是', num) if __name__ == '__main__': m = manager() num = m.list([1,2,3]) p = process(target=func , args=(num,)) p.start() p.join() print('父进程中的num',num)
进程池
在实际业务中,任务量是有多有少的,如果任务量特别多,不可能要开对应那摩多的进程数,开启那摩多进程首先
需要大量的时间让操作系统来为你管理他,其次还需要消耗大量时间让cpu帮你调度他.
进程池还会帮程序员管理进程池中的进程
进程池 : 一个形象化的池子,里面有给定的进程,这些进程一直处于待命状态,一旦有任务,就有进程去处理.
进程池中的进程都是守护进程,主进程代码执行完毕,守护进程就结束了
from multiprocessing import pool import os import time def func(num): num += 1 print(num) # if __name__ == '__main__': # p = pool(os.cpu_count()+1)#oscpu_count+1 最佳进程数量 # start = time.time() # p.map(func , [i for i in range(20)]) # p.close()#不允许再向进程池中添加任务 # p.join()#等待进程池中所有进程执行完所有任务
#p.apply()#让进程池中的进程同步的做任务 # if __name__ == '__main__': # p = pool(5) # for i in range(20):#同步处理20个任务,同步是指不管进程池中有多少个进程依然一个进程一个进程的执行,不需要join等待和close. # p.apply(func , args=(i ,)) # time.sleep(0.5)
#p.apply_async()#让进程池中的进程异步做任务 if __name__ == '__main__': p = pool(5) l = [] for i in range(20):#异步处理20个任务,异步是指进程池中有几个进程,一下就处理几个任务,那个进程任务处理完了,就接收下一个任务. re = p.apply_async(func , args=(i ,)) l.append(re)
res= [i.get() for i in l]
p.close()#不再接受新的任务,准备关闭
p.join()#等待进程池中所有进程执行任务完毕.
print(res) time.sleep(0.5)
回调函数(只有异步有)
在进程池中的回调函数是父进程调用的,和子进程无关.
from multiprocessing import pool import requests def func(url): re = requests.get() print(re.text) if re.status_code == 200: return url , re.text def call_back(sta):#func函数的返回值,会被回调函数的形参接收, url ,text = sta #print('回调函数',sta) print('回调',url) if __name__ == '__main__': p = pool(4) l = ['https//www.baidu.com', 'https // www.jd.com' 'https // www.taobao' 'https // www.mi.com' 'https // www.bilibili' ] for i in l : p.apply_async(func,args=(i,),call_back=call_back) #异步执行func任务,每一个进程执行完任务,在func中return一个结果,结果会自动被callback指定的函数 #当成形参来接收到. p.close() p.join()
线程
计算机的最小执行单位是线程;
进程是资源分配的基本单位.线程是可执行的基本单位,是可被调度的基本单位.
线程不可以自己独立拥有资源 ,线程的执行必须依赖于所属进程中的资源.
线程被称为轻量级的进程, 线程的切换速度比进程快
进程中必须至少有一个线程.
线程分为用户级和内核级线程
用户级线程 : 对于程序员来说,这样的线程完全被程序员控制执行和调度;
内核级线程 : 对于计算机内核来说 , 这样的线程完全被内核调度.
线程组成 : 代码段 ; 数据段 ; tcb(thread control block)
开启现成的方法
#方法一
from threading import thread import time def func () : print('子线程') time.sleep(1) #if __name__ == '__main__' :#线程中可以不用写这句代码 t = thread(target = func , args=()) t.start()
#方法二
from threading import thread import time
class mythread(thread)
def __init__(self):
super(mythread,self).__init__()
def run(self):
print('我是子线程')
t = mythread()
t.start()
线程和进程的比较
(1) cpu切换进程要比cpu切换线程慢得多
在python中,如果io操作过多,最好使用线程 ;
(2) 在同一个进程中,所有线程共享这个进程的pid,也就是所有线程共享所属进程的资源和内存地址
(3) 在同一个进程内,所有线程共享该进程中的全局变量(各个线程之间的局部变量不能共享)
(4) 关于守护进程与守护线程
守护进程 : 要摸自己正常结束,要摸根据父进程代码的执行结束而结束
守护线程 : 要摸自己正常结束,要摸根据父进程的执行结束而结束
(5) 全局解释器锁 , 只有cpython解释器才有,对于线程来说有了gil,所以没有真正并行,但是有真正的
多进程并行
在cpython中,io密集用多线程,计算密集用多进程
from multiprocessing import process from threading import thread import time def func(): pass if __name__ == '__main__': start = time.time() for i in range(50): p = process(target=func) p.start() print('开50个进程的时间:',time.time() - start) start = time.time() for i in range(50): p = thread(target=func) p.start() print('开50个线程的时间:', time.time() - start)
gil锁
全局解释器锁 , 只有cpython解释器才有,对于线程来说有了gil,所以没有真正并行,但是有真正的
多进程并行
强制线程放弃cpu
在同一时间内它只允许一个线程执行.
当你的任务是计算密集的情况下,使用多进程好
from multiprocessing import process from threading import thread import time,os def func(): global num number = num time.sleep(0.1) #执行此处会等待,gil会令该线程退出执行,允许下一线程进入,这一线程也要等待,同样退出执行,依次循环. #当等待时间结束,第一个线程再次进入,会从上一断点开始执行,直接执行下一步,num = number -1 结果为99, #第二线程同样从上一断点执行,直接执行下一步,num = number -1,结果也为99,以此类推. num = number -1 if __name__ == '__main__': num = 100 t = [] for i in range(50): p = thread(target=func,) p.start() t.append(p) [p.join() for p in t] print(num)
递归锁
rlock 可以有无数把锁,但是只有一把万能钥匙(一把钥匙配若干把锁)
在同一个线程内,递归锁可以无止境的acquire , 但是互斥锁不行
在不同进程内,递归锁是保证只能被一个线程拿到钥匙,然后无止境的acquire,其它线程等待
互斥锁
lock() 一把钥匙配一把锁
一把钥匙配一把锁,主要用于保护数据安全;
共享资源,又叫玲姐资源.
共享带码,又叫临界代码.
对临界资源进行操作时,一定要加锁.
gil : 全局锁
锁的是线程,是cpy解释器上的一把锁,锁的是线程,意思是同一时间只允许一个线程访问cpu
#>>>>>>>死锁 from multiprocessing import process from threading import thread ,lock import time,os def man(m_tot,m_pap): m_tot.acquire()#男的获得厕所资源,把厕所锁上了 print('男在上厕所') time.sleep(1) m_pap.acquire()#男的拿纸资源 print('男的拿到纸资源了') time.sleep(1) print('男的上完厕所了') m_tot.release()#男的还纸资源 m_pap.release()#男的还厕所资源 def woman(m_tot,m_pap): m_pap.acquire() # 女的获得纸资源 print('女的拿到纸资源了') time.sleep(1) m_tot.acquire() # 女的拿厕所资源,把厕所锁上了 print('女在上厕所') time.sleep(1) print('女的上完厕所了') m_tot.release() # 女的还厕所资源 m_pap.release() # 女的还纸资源 if __name__ == '__main__': m_tot = lock() m_pop = lock() m = thread(target=man,args=(m_tot,m_pop)) w = thread(target=woman, args=(m_tot, m_pop)) m.start() w.start() #结果 #男在上厕所 #女的拿到纸资源了
#>>>>>>解决死锁
#>>>>递归锁 : 只有一把钥匙,但是可以开所有锁,层层开锁
from multiprocessing import process from threading import thread ,rlock import time,os def man(m_tot,m_pap): m_tot.acquire()#男的手中有一把钥匙获得厕所资源,把厕所锁上了 print('男在上厕所') time.sleep(1) m_pap.acquire()#男的拿纸资源 print('男的拿到纸资源了') time.sleep(1) print('男的上完厕所了') m_tot.release()#男的还纸资源 m_pap.release()#男的还厕所资源 def woman(m_tot,m_pap): m_pap.acquire() # 女的拿到一把钥匙,获得纸资源 print('女的拿到纸资源了') time.sleep(1) m_tot.acquire() # 女的拿厕所资源,把厕所锁上了 print('女在上厕所') time.sleep(1) print('女的上完厕所了') m_tot.release() # 女的还厕所资源 m_pap.release() # 女的还纸资源 if __name__ == '__main__': m_tot = rlock() m_pop = rlock() m = thread(target=man,args=(m_tot,m_pop)) w = thread(target=woman, args=(m_tot, m_pop)) m.start() w.start()
线程间的通信与进程的用法一样(线程可以不写__main__)
信号量
from threading import semaphore
事件
from threading import event
条件
from threading import condition
条件是让程序员自行去调度线程的一个机制
# condition涉及4个方法
# acquire()
# release()
# wait() 是指让线程阻塞住
# notify(int) 是指给wait发一个信号,让wait变成不阻塞
# int是指,你要给多少给wait发信号
定时器
from threading import timer
timer(time , func )
time :睡眠时间,(秒为单位)
func : 睡眠过后,要执行的函数
from threading import timer def func(): print('定时器') timer(3,func).start()