欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python 进阶学习笔记之五:网络和进程间通信之异步 IO

程序员文章站 2022-06-04 09:08:52
...

Python 进阶系列笔记第四篇,前置文章链接:
Python 进阶学习笔记之一:内置常用类型及方法
Python 进阶学习笔记之二:常用数据类型(上)
Python 进阶学习笔记之三:常用数据类型(下)
Python 进阶学习笔记之四:高效迭代器工具

16. asyncio - 异步 IO

asyncio 模块用于支持异步 IO 编程和并发编程,这个模块是从 3.4版本引入的标准库,在后续的几个小版本中一直在增强,3.7 中也增加了一些高层次的 API,本文会略有介绍。关于并发编程,在任何高级语言中都是一门很深奥的知识分支,本笔记进作为入门知识,如果要深入研究,请参阅专门的专题或书籍。

Python 3.6 版本后 asyncio模块已经代替了原来的 asyncore模块(异步Sockt 处理)和 asynchat模块(异步 Socket 指令),因此3.6+版本的 python 推荐使用此模块进行异步执行和异步通信的编码实现。

16.1 协程与任务

入门使用

在 Python 中,一个异步执行单元叫做 协程,协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。
简单示例:

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')
     
asyncio.run(main())       #  注意,上面的协程函数 main 直接调用并不会执行

要真正运行一个协程,asyncio 提供了三种主要机制:

  1. asyncio.run()函数用来运行最高层级的入口点 “main()” 函数 (参见上面的示例)
  2. 等待一个协程,即用 await来修饰一个协程,需要注意的是 await只能用于函数内部,因此上面的例子如果要改用 await的话,还需要稍加修改
    async def run_main():
    	await main()
    	
    asyncio.run(run_main())
    
  3. 使用 asyncio.create_task()函数用来并发运行多个协程
    import asyncio
    
     async def foo(thread_name):
     	c = 1
     	while c < 5:
         	print(f"{thread_name}: {c}")
         	c = c + 1
         	await asyncio.sleep(1)
    
    
     async def main_run():
     	t1 = asyncio.create_task(foo("t1"))
     	t2 = asyncio.create_task(foo("t2"))      # 3.7+ 版本推荐使用 create_task 创建 task
    
     	t3 = asyncio.ensure_future(foo("t3"))   # 3.7 以前版本使用此方法创建 task
    
     	await t1
     	await t2
     	await t3
    
     asyncio.run(main_run()) 
    

task 的目的之一就是支持 并发 执行多个 协程,当一个协程通过 asyncio.create_task()等函数被打包为一个 任务,该协程将自动排入日程准备立即运行。

上面列子中还需要关注的点:

  • asyncio.run(coro, *, debug=False):运行 asyncio 程序入口,如果 debug 参数为 True,事件循环将以调试模式运行。
  • asyncio.create_task(coro):将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future()函数。
  • asyncio.sleep(delay, result=None, *, loop=None): 阻塞 delay 指定的秒数,也就是 休眠 ,注意这个函数本身是个协程函数,需要使用 await进行调用。

并发运行任务

函数 asyncio.gather(*aws, loop=None, return_exceptions=False)用来支持并发 task 的执行,第一个参数支持多个协程对象,如果所有可等待(协程)对象都成功完成,结果将是一个由所有返回值聚合而成的列表,结果值的顺序与 aws 中可等待对象的顺序一致。
示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# 输出
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

超时支持

有两种实现方式:
1.asyncio.wait_for(aw, timeout, *, loop=None):等待 aw 可等待对象 完成,指定 timeout 秒数后超时。timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。如果发生超时,任务将取消并引发 asyncio.TimeoutError。需要特别注意的是,函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。
```python
async def eternity():
await asyncio.sleep(3600)
print(‘yay!’)

async def main():
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

#  输出:
#  timeout!
```
  1. asyncio.wait(task, *, loop=None, timeout=None, return_when=ALL_COMPLETED):简单等待方式,并发运行指定的 task并阻塞线程直到满足 return_when 指定的条件。
    async def foo():
        return 42
    
    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})
    
    if task in done
    	pass
    

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常量 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。
  1. asyncio.as_completed(aws, *, loop=None, timeout=None):并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError。
    for f in as_completed(aws):
        earliest_result = await f
    

16.2 同步原语 (Synchronization Primitives)

asyncio 中协程多个 Task 的执行顺序是不可预测的,为了支持并发,asyncio 模块提供了一些低级的同步原子操作,这些同步原语和模块 threading中的同步原语作用类似,需要注意的是,asyncio 中的同步原语不是线程安全的。

  1. Lock:为 asyncio 的 task 实现的一种互斥锁。
    import asyncio
    
    async def foo(m_lock, name):
    	async with m_lock:
        	loop = 1
        	while loop < 5:
           		print(name)
    
            	await asyncio.sleep(1)
            	loop += 1
    
    async def main():
    	lock = asyncio.Lock()
    	await asyncio.gather(foo(lock, "A"), foo(lock, "B"),)
    
    asyncio.run(main())
    
  2. Event:asyncio event 是用来通知多个 task,某个事件已经发生。
    import asyncio
    async def waiter(event):
        print('waiting for it ...')
        await event.wait()
        print('... got it!')
    
    async def main():
        # Create an Event object.
        event = asyncio.Event()
    
        # Spawn a Task to wait until 'event' is set.
        waiter_task = asyncio.create_task(waiter(event))
    
        # Sleep for 1 second and set the event.
        await asyncio.sleep(1)
        event.set()
    
        # Wait until the waiter task is finished.
        await waiter_task
    
    asyncio.run(main())
    
  3. Condition:作用和上面的 event类似,区别是 Condition可以选择通知的协程数量
    import asyncio
    
    async def consumer(con, name):
        async with con:
            print(f"{name} is blocked")
            await con.wait()
            print(f"{name} is alive")
    
    
    async def active_condition(con):
        for i in range(1, 3):
            async with con:
                print(f"** {i} 个 task")
                con.notify(i)
                await asyncio.sleep(2)
    
        async with con:
            print("**所有task")
            con.notify_all()
    
    
    async def main():
        con = asyncio.Condition()
        task_list = []
        for i in range(1, 7):
            task_list.append(asyncio.create_task(consumer(con, i)))
    
        task_list.append(asyncio.create_task(active_condition(con)))
        await asyncio.wait(task_list)
    
    asyncio.run(main())
    
  4. Semaphore:信号量。信号量内部维护一个计数器,表示最多允许同时被放行的 task 的数量
    import asyncio
    
    async def consumer(seq, name):
        async with seq:
            print(f"{name} 开始执行")
            await asyncio.sleep(2)
            print(f"{name} 执行完成")
    
    async def main():
        seq = asyncio.Semaphore(2)         # 同时允许两个 task 执行
        task_list = []
        for i in range(1, 5):
            task_list.append(asyncio.create_task(consumer(seq, i)))
    
        await asyncio.wait(task_list)
    
    asyncio.run(main())
    
    # 输出
    1 开始执行
    2 开始执行
    1 执行完成
    2 执行完成
    3 开始执行
    4 开始执行
    3 执行完成
    4 执行完成
    

如果信号量的内部计数器设置为默认1 seq = asyncio.Semaphore(1),输出将会是下面的样子,一个一个的执行:

1 开始执行
1 执行完成
2 开始执行
2 执行完成
3 开始执行
3 执行完成
4 开始执行
4 执行完成

16.3 asyncio 队列集

asyncio 队列被设计成与标准库 queue模块类似。尽管 asyncio 队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
asyncio 队列中有三种类型,详细介绍如下:

  1. asyncio.Queue(maxsize=0, *, loop=None):先进,先出队列(FIFO),如果maxsize设置了大于0的值,取出和放入都会被阻塞。
  2. asyncio.PriorityQueue:优先队列,按优先级取出项目(优先级值越小,约先取出)
  3. asyncio.LifoQueue:后进显出队列
    """
    Queue 使用示例
    """
    import asyncio
    
    async def consumer(name, queue):
        while True:
            obj = await queue.get()            # get 没有元素后会永远阻塞在这里,还有一个非阻塞方法 get_nowait(),不过在队列为空是调用非阻塞方法会引发异常 QueueEmpty
            print(f"{name} got the {obj}")
            queue.task_done()
            
            size = queue.qsize()               # 可以获取队列的当前长度
    
    async def producer(queue):
        for i in range(1, 11):
            queue.put_nowait(i)               # 放置方法还有一个阻塞方法 put,在队列设置 maxsize 并且满后使用 put 会阻塞
            await asyncio.sleep(2)
    
    async def main():
        queue = asyncio.Queue()
        tasks = []
    
        for i in range(1, 3):
            task = asyncio.create_task(consumer(f'worker-{i}', queue))
            tasks.append(task)
    
        p_task = asyncio.create_task(producer(queue))
        tasks.append(p_task)
    
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    
    
    """
    LifoQueue 使用示例
    """
    async def consumer(name, queue):
        while True:
            await asyncio.sleep(4)
            obj = await queue.get()
            print(f"{name} get value {obj}")
            queue.task_done()
    
    
    async def producer(queue):
        for i in range(1, 11):
            queue.put_nowait(i)
            print(f"set value {i}")
            await asyncio.sleep(1)
    
    async def main():
        queue = asyncio.LifoQueue()
    
        tasks = []
    
        for i in range(1, 3):
            task = asyncio.create_task(consumer(f'worker-{i}', queue))
            tasks.append(task)
    
        p_task = asyncio.create_task(producer(queue))
        tasks.append(p_task)
    
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    
    # 输出
    set value 1
    set value 2
    set value 3
    set value 4
    worker-1 get value 4
    worker-2 get value 3
    set value 5
    set value 6
    set value 7
    set value 8
    worker-1 get value 8
    worker-2 get value 7
    set value 9
    set value 10
    worker-1 get value 10
    worker-2 get value 9
    worker-1 get value 6
    worker-2 get value 5
    worker-1 get value 2
    worker-2 get value 1
    
    """
    PriorityQueue 优先队列使用
    """
    import asyncio
    import random
    
    
    async def consumer(name, queue):
        while True:
            await asyncio.sleep(5)
            p_num, obj = await queue.get()             # get 到的值分两部分,第一个是优先级值,第二个是数据对象
            print(f"{name} get value {p_num}, {obj}")
            queue.task_done()
    
    
    async def producer(queue):
        for i in range(1, 6):
            num = random.randint(1, 100)
            queue.put_nowait((num, i,))           # put 的值是元祖形式,第一个值是优先级,第二个是数据对象
            print(f"set value {num}")
            await asyncio.sleep(1)
    
    async def main():
        queue = asyncio.PriorityQueue()
    
        tasks = []
    
        for i in range(1, 3):
            task = asyncio.create_task(consumer(f'worker-{i}', queue))
            tasks.append(task)
    
        p_task = asyncio.create_task(producer(queue))
        tasks.append(p_task)
    
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    

16.4 流

流是用于处理网络连接的高级 async/await-ready 原语,允许发送和接收数据。流 操作主要有以下几个函数组成:

  1. asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None):建立网络连接并返回一对 (reader, writer) 对象,返回的 reader 和 writer 对象是 StreamReader 和 StreamWriter 类的实例。
  2. asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True):启动套接字服务。client_connected_cb回调函数会在一个连接建立后被调用,同样返回一对 reader 和 writer 对象。
  3. asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None):作用和上面的 open_connections一样,区别是只适用于 Unix 平台。
  4. asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True):作用和上面的 start_server,区别是只适用于 Unix 平台。

StreamReader 对象

用于从 IO 流中读取数据的IO对象,不推荐直接实例创建,而是从上面的方法的返回值中获取。
常用方法:

  1. read(n=-1):从流中读取 n 个字节,如果 n=-1,则返回全部字节。
  2. readline():从流中读取一行,即使用 ‘\n’ 结尾的行内容。
  3. readexactly(n):精确读取n个字节,如果流中字节不够,则会触发 IncompleteReadError异常。
  4. readuntil(separator=b’\n’):读取内容直到指定的分隔符,默认还是 ‘\n’。

StreamWriter 对象

用于向 IO 连接中写入数据的对象,不推荐直接实例创建,而是从上面的方法的返回值中获取。
常用方法:

  1. write(data):写入数据到流中。

  2. writelines(data):写入一个序列的字节内容到流中。

  3. close()

  4. is_closing()

    示例:

    """
    服务端
    """
    import asyncio
    
    async def handle_echo(reader, writer):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')
    
        print(f"Received {message!r} from {addr!r}")
    
        print(f"Send: {message!r}")
        writer.write(data)
        await writer.drain()
    
        print("Close the connection")
        writer.close()
    
    async def main():
        server = await asyncio.start_server(
            handle_echo, '127.0.0.1', 8888)
    
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')
    
        async with server:
            await server.serve_forever()
    
    asyncio.run(main())
    
    """
    客户端
    """
    import asyncio
    
    async def tcp_echo_client(message):
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8888)
    
        print(f'Send: {message!r}')
        writer.write(message.encode())
    
        data = await reader.read(100)
        print(f'Received: {data.decode()!r}')
    
        print('Close the connection')
        writer.close()
    
    asyncio.run(tcp_echo_client('Hello World!'))
    

对于上面的各种用法,要有一个认识基础,上面协程和 Task 的执行过程其实是单线程运行的,属于一种内部调度并发(需要理解并行与并发的区别),需要谨慎使用甚至不用全局变量和引用,这也是所有语言的并发编程中减少错误的最有效手段。

16.5 低级别异步 API

上面四小节是 asyncio 高级API,是3.7版本新加入的特性,相对应的对于3.7版本之前,还有一组低层次 API,来实现相关功能,官方已经建议开发者尽量使用高层次 API,因此低层次 API这里不在展开。另外需要说明的是,网上大部分文章对于 asyncio 模块的介绍都是基于低层次 API 的,参考前需要注意。

相关标签: Python 进阶