欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python全栈之协程详解

程序员文章站 2022-06-10 15:06:21
目录1. 线程队列2. 进程池_线程池3. 回调函数4. 协程总结:1. 线程队列# ### 线程队列from queue import queue"""put 存放 超出队列长度阻塞get 获取 超...

1. 线程队列

# ### 线程队列
from queue import queue
"""
put 存放 超出队列长度阻塞
get 获取 超出队列长度阻塞
put_nowait 存放,超出队列长度报错
get_nowait 获取,超出队列长度报错
"""
# (1) queue 
"""先进先出,后进先出"""
q = queue()
q.put(100)
q.put(200)
print(q.get())
# print(q.get())
# print(q.get()) 阻塞
# print(q.get_nowait())
# print(q.get_nowait()) 报错
# queue(3)  =>  指定队列长度, 元素个数只能是3个;
q2 = queue(3)
q2.put(1000)
q2.put(2000)
# q2.put(3000)
# q2.put(4000) 阻塞
q2.put_nowait(6000)
# q2.put_nowait(4000) 报错
# (2) lifoqueue 
"""先进后出,后进先出(栈的特点)"""
from queue import lifoqueue
lq = lifoqueue()
lq.put(110)
lq.put(120)
lq.put(119)
print(lq.get())
print(lq.get())
print(lq.get())
# (3) priorityqueue
"""按照优先级顺序进行排序存放(默认从小到大)"""
"""在一个优先级队列中,要放同一类型的数据,不能混合使用"""
from queue import priorityqueue
pq = priorityqueue()
# 1.对数字进行排序
pq.put(100)
pq.put(19)
pq.put(-90)
pq.put(88)
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
# 2.对字母进行排序 (按照ascii编码)
pq.put("wangwen")
pq.put("sunjian")
pq.put('wangwei')
pq.put("王文")
pq.put("孙坚")
pq.put('王维')
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
# 3.对容器进行排序
pq.put( (22,"wangwen") )
pq.put( (67,"wangyuhan") )
pq.put( (3,"anxiaodong") )
pq.put( (3,"liuyubo") )
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
# 4.注意点
pq.put(100)
pq.put("nihao")
pq.put( (1,2,3) )

2. 进程池_线程池

知识点:

# 线程池
    # 实例化线程池      threadpoolexcutor    (推荐5*cpu_count)
    # 异步提交任务      submit / map
    # 阻塞直到任务完成   shutdown
    # 获取子线程的返回值 result
    # 使用回调函数      add_done_callback
# 线程池 是由子线程实现的
# 进程池 是由主进程实现的  

程序实现:

# ### 进程池 和 线程池
from concurrent.futures import processpoolexecutor , threadpoolexecutor
import os,time,random
# 获取的逻辑处理器
# print(os.cpu_count())
"""多条进程提前开辟,可触发多cpu的并行效果"""
'''
# (1) 进程池 processpoolexecutor
def func(i):
	# print(i)
	time.sleep(random.uniform(0.1,0.8))
	print(" 任务执行中 ...  start ... 进程号{}".format(os.getpid()) , i )
	print(" 任务执行中 ...  end ... 进程号{}".format(os.getpid()))
	return i
if __name__ == "__main__":
	lst = []
	# (1) 创建进程池对象
	"""默认参数是 系统最大的逻辑核心数 4"""
	p = processpoolexecutor()
	# (2) 异步提交任务
	"""submit(任务,参数1,参数2 ... )"""
	"""默认如果一个进程短时间内可以完成更多的任务,进程池就不会使用更多的进程来辅助完成 , 可以节省系统资源的损耗;"""
	for i in range(10):
		obj = p.submit( func , i )
		# print(obj)
		# print(obj.result()) 不要写在这,导致程序同步,内部有阻塞
		lst.append(obj)
	# (3) 获取当前任务的返回值
	for i in lst:
		print(i.result(),">===获取返回值===?")
	# (4) shutdown 等待所有进程池里的进程执行完毕之后,在放行
	p.shutdown()
	print("进程池结束 ... ")
'''
# (2) threadpoolexecutor
'''
# from threading import currentthread as ct
from threading import current_thread as ct
def func(i):
	print(" 任务执行中 ...  start ... 线程号{}".format( ct().ident ) , i )
	time.sleep(1)
	print(" 任务执行中 ...  end ... 线程号{}".format(os.getpid()))
	return ct().ident  # 线程号
if __name__ == "__main__":
	lst = []
	setvar = set()
	"""默认参数是 系统最大的逻辑核心数 4 * 5 = 20"""
	# (1) 创建线程池对象
	t = threadpoolexecutor() # 20
	# print(t)
	# (2) 异步提交任务
	"""默认如果一个线程短时间内可以完成更多的任务,线程池就不会使用更多的线程来辅助完成 , 可以节省系统资源的损耗;"""
	for i in range(100):
		obj = t.submit(func,i)
		lst.append(obj)
	# (3) 获取当前任务的返回值
	for i in lst:
		setvar.add(i.result())		
	# (4) shutdown 等待所有线程池里的线程执行完毕之后,在放行
	t.shutdown()
	print("主线程执行结束 ... ")	
	print(setvar , len(setvar))
'''
# (3) 线程池 map
from threading import currentthread as ct
from collections import iterator,iterable
def func(i):
	time.sleep(random.uniform(0.1,0.7))
	print("thread ... 线程号{}".format(ct().ident),i)
	return "*" * i
if __name__ == "__main__":
	t = threadpoolexecutor()
	it = t.map(func,range(100))
	# 返回的数据是迭代器
	print(isinstance(it,iterator))
	# 协调子父线程,等待线程池中所有线程执行完毕之后,在放行;
	t.shutdown()
	# 获取迭代器里面的返回值
	for i in it:
		print(i)
"""
# 总结: 无论是进程池还是线程池,都是由固定的进程数或者线程数来执行所有任务
系统不会额外创建多余的进程或者线程来执行任务;
"""

3. 回调函数

知识点:

# 回调函数
    就是一个参数,将这个函数作为参数传到另一个函数里面.
    函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数

程序实现:

# ### 回调函数
"""
回调函数: 回头调用一下函数获取最后结果
微信支付宝付款成功后, 获取付款金额
微信支付宝退款成功后, 获取退款金额
一般用在获取最后的状态值时,使用回调
通过add_done_callback最后调用一下自定义的回调函数;
"""
from concurrent.futures import processpoolexecutor , threadpoolexecutor
from threading import currentthread as ct
import os,time,random
"""进程任务"""
def func1(i):
	time.sleep(random.uniform(0.1,0.9))
	print(" 进程任务执行中 ...  start ... 进程号{}".format(os.getpid()) , i )
	print(" 进程任务执行中 ...  end ... 进程号{}".format(os.getpid()) )
	return i
def call_back1(obj):
	print(   "<==回调函数的进程号{}==>".format(os.getpid())   )
	print(obj.result())
"""线程任务"""	
def func2(i):
	time.sleep(random.uniform(0.1,0.9))
	print(" 线程任务执行中 ...  start ... 线程号{}".format(ct().ident) , i )
	print(" 线程任务执行中 ...  end ... 线程号{}".format( ct().ident) )
	return i
def call_back2(obj):
	print(   "<==回调函数的线程号{}==>".format(  ct().ident) )
	print(obj.result())
if __name__ == "__main__":
	"""		
	# (1)进程池  结果:(进程池的回调函数由主进程执行)
	p = processpoolexecutor() # os.cpu_count()  => 4
	for i in range(1,11):
		obj = p.submit(func1 , i )
		# 使用add_done_callback在获取最后返回值的时候,可以异步并行
		obj.add_done_callback(call_back1)
		# 直接使用result获取返回值的时候,会变成同步程序,速度慢;
		# obj.result()
	p.shutdown()		
	print(   "主进程执行结束...进程号:"    ,    os.getpid()  )
	"""
	print("<==============================================>")
	# (2)线程池  结果:(线程池的回调函数由子线程执行)
	t = threadpoolexecutor()
	for i in range(1,11):
		obj = t.submit(func2 , i )
		# 使用add_done_callback在获取最后返回值的时候,可以异步并发
		obj.add_done_callback(call_back2)
		# 直接使用result获取返回值的时候,会变成同步程序,速度慢;
		# obj.result()
	t.shutdown()
	print("主线程执行结束 .... 线程号{}".format(ct().ident))
"""
# 原型:
class ceshi():
	def add_done_callback(self,func):
		print("系统执行操作1 ... ")
		print("系统执行操作2 ... ")
		# 回头调用一下
		func(self)
	def result(self):
		return 112233
def call_back(obj):
	print(obj.result())
obj = ceshi()
obj.add_done_callback(call_back)
"""

4. 协程

知识点:

#协程也叫纤程: 协程是线程的一种实现方式.
    指的是一条线程能够在多任务之间来回切换的一种实现.
    对于cpu、操作系统来说,协程并不存在.
    任务之间的切换会花费时间.
    目前电脑配置一般线程开到200会阻塞卡顿.
#协程的实现
协程帮助你记住哪个任务执行到哪个位置上了,并且实现安全的切换
一个任务一旦阻塞卡顿,立刻切换到另一个任务继续执行,保证线程总是忙碌的,更加充分的利用cpu,抢占更多的时间片
# 一个线程可以由多个协程来实现,协程之间不会产生数据安全问题
#协程模块
    # greenlet  gevent的底层,协程,切换的模块
    # gevent    直接用的,gevent能提供更全面的功能

程序实现:

# ### 协程 
"""
进程是资源分配的最小单位
线程是程序调度的最下单位
协程是线程实现的具体方式
总结:
在进程一定的情况下,开辟多个线程,
在线程一定的情况下,创建多个协程,
以便提高更大的并行并发
"""
# (1) 用协程改写生产者消费者模型
"""
def producer():
	for i in range(1000):
		yield i
def consumer(gen):
	for i in range(10):
		print(  next(gen)  )
gen = producer()
consumer(gen)
print("<==========>")
consumer(gen)
print("<==========>")
consumer(gen)
"""
# (2) greenlet 协程的早期版本
from greenlet import greenlet
import time
""" switch 可以切换任务,但是需要手动切换"""
"""
def eat():
	print("eat1")
	g2.switch()
	time.sleep(3)
	print("eat2")
def play():
	print("play1")	
	time.sleep(3)
	print("play2")
	g1.switch()
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()
"""
# (3) 升级到gevent版本
"""自动进行任务上的切换,但是不能识别阻塞"""
"""
import gevent
def eat():
	print("eat1")
	gevent.sleep(3)
	# time.sleep(3)
	print("eat2")
def play():
	print("play1")
	gevent.sleep(3)	
	# time.sleep(3)
	print("play2")
# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)
# 如果不加join, 主线程直接结束任务,不会默认等待协程任务.
# 阻塞,必须等待g1任务完成之后在放行
g1.join()
# 阻塞,必须等待g2任务完成之后在放行
g2.join()
print("主线程执行结束 ....  ")
"""
# (4) 协程的终极版本;
from gevent import monkey;monkey.patch_all()
"""引入猴子补丁,可以实现所有的阻塞全部识别"""
import time
import gevent
def eat():
	print("eat1")
	time.sleep(3)
	print("eat2")
def play():
	print("play1")
	time.sleep(3)
	print("play2")
# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)
# 如果不加join, 主线程直接结束任务,不会默认等待协程任务.
# 阻塞,必须等待g1任务完成之后在放行
g1.join()
# 阻塞,必须等待g2任务完成之后在放行
g2.join()
print(" 主线程执行结束 ... ")
"""
# 分号,利用分号可以把多行代码放在一行进行编写;
a = 1
b = 2
a = 1;b = 2
"""

==理解:==一个线程上有好多任务,协程可以记住每个任务完成的状态,比如做饭的时候做到一半的时候停下来,去扫地,扫完地之后拐回来做饭,从做到一半的时候开始做。

Python全栈之协程详解

小提示: 下载gevent包,会自带greenlet

Python全栈之协程详解

早期版本的想到在time.sleep执行了两次,每次执行了一秒钟,切换回来有执行了一秒,这是模拟早期版本,模拟堵塞

Python全栈之协程详解

总结:

p.shutdown() 这里的shutdown类似于join
生成器在实例化对象的时候,里面的代码是不走的,调用的时候才有,next
调用等
单线程实现的一种异步并发的一种结构
协程能记住任务的状态

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注的更多内容!