Python 进阶学习笔记之五:网络和进程间通信之异步 IO
Python 进阶系列笔记第四篇,前置文章链接:
Python 进阶学习笔记之一:内置常用类型及方法
Python 进阶学习笔记之二:常用数据类型(上)
Python 进阶学习笔记之三:常用数据类型(下)
Python 进阶学习笔记之四:高效迭代器工具
16. asyncio - 异步 IO
asyncio 模块用于支持异步 IO 编程和并发编程,这个模块是从 3.4
版本引入的标准库,在后续的几个小版本中一直在增强,3.7 中也增加了一些高层次的 API,本文会略有介绍。关于并发编程,在任何高级语言中都是一门很深奥的知识分支,本笔记进作为入门知识,如果要深入研究,请参阅专门的专题或书籍。
Python 3.6 版本后 asyncio
模块已经代替了原来的 asyncore
模块(异步Sockt 处理)和 asynchat
模块(异步 Socket 指令),因此3.6+版本的 python 推荐使用此模块进行异步执行和异步通信的编码实现。
16.1 协程与任务
入门使用
在 Python 中,一个异步执行单元叫做 协程
,协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。
简单示例:
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main()) # 注意,上面的协程函数 main 直接调用并不会执行
要真正运行一个协程,asyncio 提供了三种主要机制:
-
asyncio.run()
函数用来运行最高层级的入口点 “main()” 函数 (参见上面的示例) - 等待一个协程,即用
await
来修饰一个协程,需要注意的是await
只能用于函数内部,因此上面的例子如果要改用await
的话,还需要稍加修改async def run_main(): await main() asyncio.run(run_main())
- 使用
asyncio.create_task()
函数用来并发运行多个协程import asyncio async def foo(thread_name): c = 1 while c < 5: print(f"{thread_name}: {c}") c = c + 1 await asyncio.sleep(1) async def main_run(): t1 = asyncio.create_task(foo("t1")) t2 = asyncio.create_task(foo("t2")) # 3.7+ 版本推荐使用 create_task 创建 task t3 = asyncio.ensure_future(foo("t3")) # 3.7 以前版本使用此方法创建 task await t1 await t2 await t3 asyncio.run(main_run())
task 的目的之一就是支持 并发 执行多个 协程,当一个协程通过 asyncio.create_task()
等函数被打包为一个 任务,该协程将自动排入日程准备立即运行。
上面列子中还需要关注的点:
-
asyncio.run(coro, *, debug=False)
:运行 asyncio 程序入口,如果 debug 参数为 True,事件循环将以调试模式运行。 -
asyncio.create_task(coro)
:将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的asyncio.ensure_future()
函数。 -
asyncio.sleep(delay, result=None, *, loop=None)
: 阻塞 delay 指定的秒数,也就是 休眠 ,注意这个函数本身是个协程函数,需要使用await
进行调用。
并发运行任务
函数 asyncio.gather(*aws, loop=None, return_exceptions=False)
用来支持并发 task 的执行,第一个参数支持多个协程对象,如果所有可等待(协程)对象都成功完成,结果将是一个由所有返回值聚合而成的列表,结果值的顺序与 aws 中可等待对象的顺序一致。
示例:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# 输出
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
超时支持
有两种实现方式:
1.asyncio.wait_for(aw, timeout, *, loop=None)
:等待 aw 可等待对象 完成,指定 timeout 秒数后超时。timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。如果发生超时,任务将取消并引发 asyncio.TimeoutError。需要特别注意的是,函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。
```python
async def eternity():
await asyncio.sleep(3600)
print(‘yay!’)
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# 输出:
# timeout!
```
-
asyncio.wait(task, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
:简单等待方式,并发运行指定的task
并阻塞线程直到满足 return_when 指定的条件。async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done pass
return_when 指定此函数应在何时返回。它必须为以下常数之一:
常量 | 描述 |
---|---|
FIRST_COMPLETED | 函数将在任意可等待对象结束或取消时返回。 |
FIRST_EXCEPTION | 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。 |
ALL_COMPLETED | 函数将在所有可等待对象结束或取消时返回。 |
-
asyncio.as_completed(aws, *, loop=None, timeout=None)
:并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError。for f in as_completed(aws): earliest_result = await f
16.2 同步原语 (Synchronization Primitives)
asyncio 中协程多个 Task 的执行顺序是不可预测的,为了支持并发,asyncio 模块提供了一些低级的同步原子操作,这些同步原语和模块 threading
中的同步原语作用类似,需要注意的是,asyncio 中的同步原语不是线程安全的。
- Lock:为 asyncio 的 task 实现的一种互斥锁。
import asyncio async def foo(m_lock, name): async with m_lock: loop = 1 while loop < 5: print(name) await asyncio.sleep(1) loop += 1 async def main(): lock = asyncio.Lock() await asyncio.gather(foo(lock, "A"), foo(lock, "B"),) asyncio.run(main())
- Event:asyncio event 是用来通知多个 task,某个事件已经发生。
import asyncio async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- Condition:作用和上面的
event
类似,区别是Condition
可以选择通知的协程数量import asyncio async def consumer(con, name): async with con: print(f"{name} is blocked") await con.wait() print(f"{name} is alive") async def active_condition(con): for i in range(1, 3): async with con: print(f"** {i} 个 task") con.notify(i) await asyncio.sleep(2) async with con: print("**所有task") con.notify_all() async def main(): con = asyncio.Condition() task_list = [] for i in range(1, 7): task_list.append(asyncio.create_task(consumer(con, i))) task_list.append(asyncio.create_task(active_condition(con))) await asyncio.wait(task_list) asyncio.run(main())
- Semaphore:信号量。信号量内部维护一个计数器,表示最多允许同时被放行的 task 的数量
import asyncio async def consumer(seq, name): async with seq: print(f"{name} 开始执行") await asyncio.sleep(2) print(f"{name} 执行完成") async def main(): seq = asyncio.Semaphore(2) # 同时允许两个 task 执行 task_list = [] for i in range(1, 5): task_list.append(asyncio.create_task(consumer(seq, i))) await asyncio.wait(task_list) asyncio.run(main()) # 输出 1 开始执行 2 开始执行 1 执行完成 2 执行完成 3 开始执行 4 开始执行 3 执行完成 4 执行完成
如果信号量的内部计数器设置为默认1 seq = asyncio.Semaphore(1)
,输出将会是下面的样子,一个一个的执行:
1 开始执行
1 执行完成
2 开始执行
2 执行完成
3 开始执行
3 执行完成
4 开始执行
4 执行完成
16.3 asyncio 队列集
asyncio 队列被设计成与标准库 queue
模块类似。尽管 asyncio 队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
asyncio 队列中有三种类型,详细介绍如下:
- asyncio.Queue(maxsize=0, *, loop=None):先进,先出队列(FIFO),如果maxsize设置了大于0的值,取出和放入都会被阻塞。
- asyncio.PriorityQueue:优先队列,按优先级取出项目(优先级值越小,约先取出)
- asyncio.LifoQueue:后进显出队列
""" Queue 使用示例 """ import asyncio async def consumer(name, queue): while True: obj = await queue.get() # get 没有元素后会永远阻塞在这里,还有一个非阻塞方法 get_nowait(),不过在队列为空是调用非阻塞方法会引发异常 QueueEmpty print(f"{name} got the {obj}") queue.task_done() size = queue.qsize() # 可以获取队列的当前长度 async def producer(queue): for i in range(1, 11): queue.put_nowait(i) # 放置方法还有一个阻塞方法 put,在队列设置 maxsize 并且满后使用 put 会阻塞 await asyncio.sleep(2) async def main(): queue = asyncio.Queue() tasks = [] for i in range(1, 3): task = asyncio.create_task(consumer(f'worker-{i}', queue)) tasks.append(task) p_task = asyncio.create_task(producer(queue)) tasks.append(p_task) await asyncio.gather(*tasks) asyncio.run(main())
""" LifoQueue 使用示例 """ async def consumer(name, queue): while True: await asyncio.sleep(4) obj = await queue.get() print(f"{name} get value {obj}") queue.task_done() async def producer(queue): for i in range(1, 11): queue.put_nowait(i) print(f"set value {i}") await asyncio.sleep(1) async def main(): queue = asyncio.LifoQueue() tasks = [] for i in range(1, 3): task = asyncio.create_task(consumer(f'worker-{i}', queue)) tasks.append(task) p_task = asyncio.create_task(producer(queue)) tasks.append(p_task) await asyncio.gather(*tasks) asyncio.run(main()) # 输出 set value 1 set value 2 set value 3 set value 4 worker-1 get value 4 worker-2 get value 3 set value 5 set value 6 set value 7 set value 8 worker-1 get value 8 worker-2 get value 7 set value 9 set value 10 worker-1 get value 10 worker-2 get value 9 worker-1 get value 6 worker-2 get value 5 worker-1 get value 2 worker-2 get value 1
""" PriorityQueue 优先队列使用 """ import asyncio import random async def consumer(name, queue): while True: await asyncio.sleep(5) p_num, obj = await queue.get() # get 到的值分两部分,第一个是优先级值,第二个是数据对象 print(f"{name} get value {p_num}, {obj}") queue.task_done() async def producer(queue): for i in range(1, 6): num = random.randint(1, 100) queue.put_nowait((num, i,)) # put 的值是元祖形式,第一个值是优先级,第二个是数据对象 print(f"set value {num}") await asyncio.sleep(1) async def main(): queue = asyncio.PriorityQueue() tasks = [] for i in range(1, 3): task = asyncio.create_task(consumer(f'worker-{i}', queue)) tasks.append(task) p_task = asyncio.create_task(producer(queue)) tasks.append(p_task) await asyncio.gather(*tasks) asyncio.run(main())
16.4 流
流是用于处理网络连接的高级 async/await-ready 原语,允许发送和接收数据。流 操作主要有以下几个函数组成:
-
asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
:建立网络连接并返回一对 (reader, writer) 对象,返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。 -
asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
:启动套接字服务。client_connected_cb
回调函数会在一个连接建立后被调用,同样返回一对 reader 和 writer 对象。 -
asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
:作用和上面的open_connections
一样,区别是只适用于 Unix 平台。 -
asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
:作用和上面的start_server
,区别是只适用于 Unix 平台。
StreamReader 对象
用于从 IO 流中读取数据的IO对象,不推荐直接实例创建,而是从上面的方法的返回值中获取。
常用方法:
- read(n=-1):从流中读取 n 个字节,如果 n=-1,则返回全部字节。
- readline():从流中读取一行,即使用 ‘\n’ 结尾的行内容。
- readexactly(n):精确读取n个字节,如果流中字节不够,则会触发
IncompleteReadError
异常。 - readuntil(separator=b’\n’):读取内容直到指定的分隔符,默认还是 ‘\n’。
StreamWriter 对象
用于向 IO 连接中写入数据的对象,不推荐直接实例创建,而是从上面的方法的返回值中获取。
常用方法:
-
write(data):写入数据到流中。
-
writelines(data):写入一个序列的字节内容到流中。
-
close()
-
is_closing()
示例:
""" 服务端 """ import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
""" 客户端 """ import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() asyncio.run(tcp_echo_client('Hello World!'))
对于上面的各种用法,要有一个认识基础,上面协程和 Task 的执行过程其实是单线程运行的,属于一种内部调度并发(需要理解并行与并发的区别),需要谨慎使用甚至不用全局变量和引用,这也是所有语言的并发编程中减少错误的最有效手段。
16.5 低级别异步 API
上面四小节是 asyncio 高级API,是3.7版本新加入的特性,相对应的对于3.7版本之前,还有一组低层次 API,来实现相关功能,官方已经建议开发者尽量使用高层次 API,因此低层次 API这里不在展开。另外需要说明的是,网上大部分文章对于 asyncio 模块的介绍都是基于低层次 API 的,参考前需要注意。
上一篇: 在CSS中使用when/else的方法