欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

程序员文章站 2022-05-08 08:44:32
...

1. 协程(coroutine)的概念

根据Wikipedia, “协程是非抢先多任务的一般子例程,通过允许多个入口点用于在某些位置挂起和恢复执行的计算机程序组件”。这是一种相当技术的说法,简单来说就是函数的内部可以中断,转而去执行其他的函数,并且可以保留前一函数的状态,等在适当的时候再返回来接着执行前一函数,看起来同时像在做多件事情。人也是可以同时做多件事的,如果把协程比作一个人的话,他想去泡茶,在等待水烧开的同时可以去洗茶壶、可以写代码,一旦水开了,我就可以回来继续泡茶了。相当于JavaScript中的Promise.

    协程的特点在于是始终只有一个线程执行

如果想利用多核CPU,最好是使用多进程+协程。

2. 并发(concurrency)编程的三种方式及比较

  • 多进程(multiprocessing)
  • 线程(threading)
  • 协程(coroutine)

多线程和多进程对IO的调度主要取决于系统,而协程的方式,调度来自用户。与线程相比,协程有着极高的执行效率,没有线程切换的开销,协程中控制共享资源不加锁,也不存在同时写变量的冲突。

Python由于众所周知的GIL的原因,导致其线程无法发挥多核的并行计算能力(当然,后来有了multiprocessing,可以实现多进程并行),显得比较鸡肋。既然在GIL之下,同一时刻只能有一个线程在运行,那么对于CPU密集的程序来说,线程之间的切换开销就成了拖累,而以I/O为瓶颈的程序正是协程所擅长的: 多任务并发(非并行),每个任务在合适的时候挂起(发起I/O)和恢复(I/O结束)  。

即然协程这么牛逼,本文就只讲它了

3. 协程,一个简单的实现

协程通过 async/await 语法进行声明,如下

import asyncio
from datetime import datetime

async def main():
    print('hello', datetime.now())
    await asyncio.sleep(1)
    print('world',datetime.now())

# 注意这里:如果你是在pycharm或vscode里面运行代码的话,请使用如下代码
# asyncio.run(main())  #它相当于一个同步方法的一个入口函数,用到的非常多

# 如果你像我一样在jupyter里面运行代码,可以直接使用如下代码,因为jupyter(IPython)已经是在一个事件循环里运行了。
# 官网的说法:This function cannot be called when another asyncio event loop is running in the same thread.You can now use async/await at the top level in the IPython terminal and in the notebook, it should — in most of the cases — “just work”. Update IPython to version 7+, IPykernel to version 5+, and you’re off to the races.
await main() 

运行结果如下:

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

如果直接运行main()方法,它会打印出main()返回得是一个协程对象(coroutine object)

main() # <coroutine object main at 0x0000019B892CB2C8>

async 和await 不必要在一个方法中成对出现(在C#中,它们必须成对出现),也就是说只要在方法的前面加上async 关键字,它就是一个协程对象(coroutine object),相当于一个Promise对象,执行它的时候不会立即返回执行的结果。

python3.5以上的版本,可以使用async/await来定义协程的关键字,如果你浏览过以下关键字或函数,请忽略它,因为它们都过时了或者是低层级的实现,如果你没听说过,那更好,没有历史包袱。除非你想使用旧版本的python,或者使用更底层的方法完成复杂的功能,本文是基于目前最新的python 3.7版本。

  • @asyncio.coroutine
  • yield from
  • loop.ensure_future()

4.  可等待对象

能使用await语句的都是可等待对象,python中有三种:

  • 任务(Task):一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。不建议手动实例化Task
  • 协程(Coroutine) : 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • Future:是一个特殊的低层级的可等待对象,通常情况,没必要创建Future对象。

5. 并发运行任务gether()

使用gather并发运行任务,gether的方法如下:

    awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

  • 所有的可等等待对象会被自动打包成Task对象
  • 如果return_exceptions = False(默认),  引发的首个异常会立即传播给等待 gather() 的任务, 也就是说,会立即抛错,未执行的Task不会继续执行。
  • 如果return_exceptions = True, 异常会和成功的结果一样处理,并聚合至结果列表.
  • 如果gather()被取消,所有未完成的任务也会被取消
import asyncio
import threading


async def greet(name, delay):
    i = 1
    thread_name = threading.currentThread().name
    await asyncio.sleep(delay)
    print(f'Hello, {name}, i:{i}, thread name:{thread_name}, current time:{datetime.now()}')
    i += 1
    return name


async def main():
    thread_name = threading.currentThread().name
    print(f'start:: thread name:{thread_name}, current time:{datetime.now()}')

    task3 = asyncio.create_task(greet('CCC',3)) # 手动将协程greet('CCC',3)打包成Task对象

    result = await asyncio.gather(
        greet('AAA', 1), # 自动被打包成Task对象
        greet('BBB', 2), # 自动被打包成Task对象
        task3,  
    )

    print(result, type(result))

    print(f'end:: thread name:{thread_name}, current time:{datetime.now()}')

await main()

结果如下,可以看到,线程始终只有一个,名字叫MainThread,i 始终为1,所以不存在同时写变量的冲突,总时间只花费了3秒(由最长的task的执行时间决定),如果使用同步的方式来写的话,时间需要花费6秒(1+2+3)。

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

 如果return_exceptions = True, 异常会和成功的结果一样处理,并聚合至结果列表.

async def greet(name, delay):
    i = 1
    thread_name = threading.currentThread().name
    await asyncio.sleep(delay)
    print(
        f'Hello, {name}, i:{i}, thread name:{thread_name}, current time:{datetime.now()}')
    i += 1

    if name == 'BBB':
        1/0  # 这里让它除0,抛错

    return name


async def main():
    thread_name = threading.currentThread().name
    print(f'start:: thread name:{thread_name}, current time:{datetime.now()}')

    task3 = asyncio.create_task(greet('CCC', 3))

    result = await asyncio.gather(
        greet('AAA', 1),
        greet('BBB', 2),
        task3,
        return_exceptions=True
    )  # 这里 return_exceptions=True, 异常会和成功的结果一样处理,并聚合至结果列表

    print(result, type(result))

    print(f'end:: thread name:{thread_name}, current time:{datetime.now()}')

await main()

 结果如下:Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

6. 取消任务

如果一个任务由于执行的时间过长,还没有返回结果,我们可以手动将它取消

async def greet(name, delay):
    try:
        await asyncio.sleep(delay)
        print(f'Hello, {name}, current time:{datetime.now()}')

        return name
    except asyncio.CancelledError as error:
        print('CancelledError 被捕获了')
        raise #继续向上抛出

async def main():
    print(f'start:: current time:{datetime.now()}')

    task = asyncio.create_task(greet('AAA', 5))  # 创建一个任务,需要5秒才能执行完
    await asyncio.sleep(2)
    task.cancel()  # 在第二秒时就cancel它

    await asyncio.sleep(1)
    print('是否被cancel了:', task.cancelled())  # True
    print('是否结束了:', task.done())  # True
        
    # print('结果是啥:', task.result())  # 因为task异常了,直接执行它会抛出异常
    # print('异常是啥:', task.exception())  # 会抛出task的异常

    print(f'end:: current time:{datetime.now()}')

await main()

 结果显示如下:

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

上面的except中,如果将rasie这一行注释掉,即不继续向上抛出错误,会怎么样呢?结果如下:

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

也就是说greet()方法可以通过try/except来控制是否取消Task, 也就是说Task.cancel()不能保证Task被取消。

7.  超时处理

async def greet(name, delay):
    try:
        await asyncio.sleep(delay)
        print(f'Hello, {name}, current time:{datetime.now()}')

        return name
    except asyncio.TimeoutError as error:
        print('TimeoutError 永远捕获不了你')
        raise  # 既然捕获不了你,抛出也没用
    except asyncio.CancelledError as error:
        print('CancelledError 居然在这里可以捕获你,人生处处是惊喜呀')
        raise # 即使不向上抛,main函数中的TimeoutError依然会被捕获


async def main():
    print(f'start:: current time:{datetime.now()}')

    task = asyncio.create_task(greet('AAA', 5))  # 创建一个任务,需要5秒才能执行完

    try:
        await asyncio.wait_for(task, 2) # 两秒等不到你,我就不等了

    except asyncio.TimeoutError as error:
        print('TimeoutError 被捕获了')

    print('是否被cancel了:', task.cancelled())  # True,可以看出因为超时,task会被cancel
    print('是否结束了:', task.done())  # True

    # print('结果是啥:', task.result())  # 因为task异常了,直接执行它会抛出异常
    # print('异常是啥:', task.exception())  # 会抛出task的异常

    print(f'end:: current time:{datetime.now()}')

await main()

结果如下:

Python: 进阶系列之五:并发编程:异步IO(asyncio) 协程(coroutine)与任务(task)的使用

8. 并发运行任务的另一种实现asyncio.wait()

coroutine asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数

描述

FIRST_COMPLETED

函数将在任意可等待对象结束或取消时返回。

FIRST_EXCEPTION

函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

ALL_COMPLETED

函数将在所有可等待对象结束或取消时返回。

async def greet(name):
    result = await asyncio.sleep(1, f'Hello,{name}')
    return result

async def main():
    task = asyncio.create_task(greet('AAA'))
    done, pending = await asyncio.wait({task, greet('BBB')})

    # 运行会报错
    # if task in pending:
    #     print('这段代码将会运行1')

    if task in done:
        print('这段代码将会运行2')

await main()

 9. 并发运行任务的比较:asyncio.gather()与asyncio.wait()

  • 两种都能实现并发
  • asyncio.gather():是一种高层级的用法,它自动帮我们收集好了返回的结果,我们通常使用它
  • asyncio.wait():是一种低层级的用法,你可以对task作更多的控制,手动收集返回的结果

参考链接