Python开发【模块】:asyncio
异步asyncio
asyncio是一个使用async / await语法编写并发代码的库。
asyncio用作多个python异步框架的基础,这些框架提供高性能的网络和web服务器,数据库连接库,分布式任务队列等。
asyncio通常非常适合io绑定和高级 结构化网络代码。
asyncio提供了一组高级 api:
- 同时运行python协同程序并完全控制它们的执行;
- 执行网络io和ipc ;
- 控制 ;
- 通过分配任务;
- 并发代码;
此外,还有一些用于库和框架开发人员的低级 api :
-
创建和管理,提供异步api ,运行,处理等;
os signals
- 使用实现有效的协议 ;
- 使用async / await语法基于回调的库和代码。
conroutines
使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,以下代码片段(需要python 3.7+)打印“hello”,等待1秒,然后打印“world”:
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
# hello
# world
上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在python 3.10中删除。)
import asyncio
@asyncio.coroutine
def main():
print('hello')
yield from asyncio.sleep(1)
print('world')
asyncio.run(main())
asyncio实际等同于下面的工作(参数为an asyncio.future, a coroutine or an awaitable is required)
import asyncio
@asyncio.coroutine
def main():
print('hello')
yield from asyncio.sleep(1)
print('world')
# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())
# hello
# world
1 this function runs the passed coroutine, taking care of 2 managing the asyncio event loop and finalizing asynchronous 3 generators. 4 5 this function cannot be called when another asyncio event loop is 6 running in the same thread. 7 8 if debug is true, the event loop will be run in debug mode. 9 10 this function always creates a new event loop and closes it at the end. 11 it should be used as a main entry point for asyncio programs, and should 12 ideally only be called once.
实际运行协程asyncio提供了三种主要机制:
1、the asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)
2、awaiting on a coroutine 以下代码片段将在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” :
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%x')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%x')}")
asyncio.run(main())
# started at 11:54:48
# hello
# world
# finished at 11:54:51
3、与asyncio同时运行协同程序的功能tasks;
让我们修改上面的例子并同时运行两个say_after
协同程序 :
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%x')}")
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%x')}")
# wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%x')}")
asyncio.run(main())
# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24
稍微改变一下形式,可以理解的更多
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%x')}")
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%x')}")
# wait until both tasks are completed (should take
# around 2 seconds.)
await asyncio.sleep(3)
# await task1
# await task2
print(f"finished at {time.strftime('%x')}")
asyncio.run(main())
# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44
awaitables
我们说如果一个对象可以在表达式中使用,那么它就是一个等待对象。许多asyncio api旨在接受等待。
有三种主要类型的等待对象:coroutines, tasks, and futures.
coroutines
python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio
async def nested():
return 42
async def main():
# nothing happens if we just call "nested()".
# a coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
# 42
重要
在本文档中,术语“coroutine”可用于两个密切相关的概念:
- 一个协程功能:一个功能;
- 一个协程对象:通过调用协同程序函数返回的对象 。
tasks
任务用于调度协同程序并发。
当一个协程被包装到一个task中时,会像)
一样 conroutine自动安排很快运行:
import asyncio
async def nested():
return 42
async def main():
# schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
futures
a future
是一个特殊的低级别等待对象,它表示异步操作的最终结果。
当等待 future对象时,它意味着协程将等到future在其他地方解析。
需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用。
通常,不需要在应用程序级代码中创建future对象。
可以等待有时通过库和一些asyncio api公开的未来对象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
返回future对象的低级函数的一个很好的例子是。
asyncio方法
1、运行asyncio程序
asyncio.run(coro,*,debug = false )
此函数运行传递的协同程序,负责管理asyncio事件循环并最终确定异步生成器。
当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。
如果是debugtrue
,则事件循环将以调试模式运行。
此函数始终创建一个新的事件循环并在结束时将其关闭。它应该用作asyncio程序的主要入口点,理想情况下应该只调用一次。
版本3.7中的新功能:重要:此功能已添加到python 3.7中的asyncio中。
2、创建任务
asyncio.create_task(coro)
将coro 包装成a task
并安排执行。返回task对象。
任务在返回的循环中执行,如果当前线程中没有运行循环, runtimeerror
则引发该任务。
python 3.7中添加了此功能。在python 3.7之前,可以使用低级函数:
async def coro():
...
# in python 3.7+
task = asyncio.create_task(coro())
...
# this works in all python versions but is less readable
task = asyncio.ensure_future(coro())
3、sleeping
coroutine asyncio.
sleep
(delay, result=none, *, loop=none)
阻止 delay seconds.。
如果提供了result ,则在协程完成时将其返回给调用者。
leep()
始终挂起当前任务,允许其他任务运行。
该loop 参数已被弃用,并定于去除在python 3.10。
协程示例每秒显示当前日期5秒:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while true:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
4、同时运行任务
awaitable asyncio.
gather
(*aws, loop=none, return_exceptions=false)
同时在aws 序列中运行
如果在aws中任何awaitable 是协程,它将自动安排为任务
如果所有等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序
如果return_exceptions是false
(默认),则第一个引发的异常会立即传播到等待的任务gather()
。
如果return_exceptions是true
,异常的处理方式一样成功的结果,并在结果列表汇总。
如果gather()
被取消,所有提交的awaitables(尚未完成)也被取消。
如果aws序列中的task or future被取消,则将其视为已引发cancellederror
- 在这种情况下不会取消gather()
呼叫。这是为了防止取消一个提交的tasks/futures 以导致其他任务/期货被取消。
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"task {name}: compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"task {name}: factorial({number}) = {f}")
async def main():
# schedule three calls *concurrently*:
await asyncio.gather(
factorial("a", 2),
factorial("b", 3),
factorial("c", 4),
)
asyncio.run(main())
# expected output:
#
# task a: compute factorial(2)...
# task b: compute factorial(2)...
# task c: compute factorial(2)...
# task a: factorial(2) = 2
# task b: compute factorial(3)...
# task c: compute factorial(3)...
# task b: factorial(3) = 6
# task c: compute factorial(4)...
# task c: factorial(4) = 24
获取返回结果,异常情况
import asyncio
async def factorial(name, number):
print(name)
if name == 'a':
return name
elif name == 'b':
raise syntaxerror(name)
await asyncio.sleep(number)
async def main():
# schedule three calls *concurrently*:
result = await asyncio.gather(
factorial("a", 2),
factorial("b", 3),
factorial("c", 4),
return_exceptions=true
)
print(result)
asyncio.run(main())
# a
# b
# c
# ['a', syntaxerror('b'), none]
版本3.7中已更改:如果取消了聚集本身,则无论return_exceptions如何,都会传播取消。
5、shielding from cancellation
awaitable asyncio.
shield
(aw, *, loop=none)
protect an from being cancelled
.
if aw is a coroutine it is automatically scheduled as a task.
the statement:
res = await shield(something())
等同于
res = await something()
除非取消包含它的协程,否则something()
不会取消运行的任务。从观点来看something()
,取消没有发生。虽然它的来电者仍然被取消,所以“等待”表达仍然提出了一个cancellederror
。
如果something()
通过其他方式取消(即从内部取消)也会取消shield()
。
如果希望完全忽略取消(不推荐),则该shield()
函数应与try / except子句结合使用,如下所示:
try:
res = await shield(something())
except cancellederror:
res = none
6、超时
coroutine asyncio.
wait_for
(aw, timeout, *, loop=none)
wait for the aw to complete with a timeout.
if aw is a coroutine it is automatically scheduled as a task.
timeout可以是none
或等待的float或int秒数。如果超时是none
,将等到完成
如果发生超时,它将取消任务并加注 asyncio.timeouterror
。
要避免该任务cancellation
,请将其包装。
该函数将一直等到将来实际取消,因此总等待时间可能会超过超时。
如果等待被取消,则未来的aw也会被取消。
该循环参数已被弃用,并定于去除在python 3.10。
async def eternity():
# sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.timeouterror:
print('timeout!')
asyncio.run(main())
# expected output:
#
# timeout!
改变在3.7版本:当aw被取消,由于超时,wait_for
等待aw被取消。以前,它asyncio.timeouterror
立即提出 。
7、超时原语
coroutine asyncio.
wait
(aws, *, loop=none, timeout=none, return_when=all_completed)
同时运行aws中的 并阻塞,直到return_when指定的条件为止。
如果在aws中任何等待的是协程,它将自动安排为任务。wait()
直接传递协同程序对象 已被弃用,因为它会导致 。
返回两组任务/期货:。(done, pending)
用法:
done, pending = await asyncio.wait(aws)
该循环参数已被弃用,并定于去除在python 3.10。
timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。
请注意,此功能不会引发asyncio.timeouterror
。超时发生时未完成的期货或任务仅在第二组中返回。
return_when表示此函数何时返回。它必须是以下常量之一:
不变 | 描述 |
---|---|
first_completed |
当任何未来完成或取消时,该函数将返回。 |
first_exception |
当任何未来通过引发异常完成时,函数将返回。如果没有未来引发异常则等同于 all_completed 。
|
all_completed |
所有期货结束或取消时,该功能将返回。 |
不像,wait()
当发生超时不会取消期货。
注意 wait()
将协同程序自动调度为任务,然后在 集合中返回隐式创建的任务对象。因此,以下代码将无法按预期方式工作:(done, pending)
async def foo():
return 42
coro = foo()
done, pending = await asyncio.wait({coro})
if coro in done:
# this branch will never be run!
以下是如何修复上述代码段:
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})
if task in done:
# everything will work as expected now.
wait()
不推荐直接传递协程对象。
8、 scheduling from other threads
asyncio.
run_coroutine_threadsafe
(coro, loop)
将协程提交给给定的事件循环。线程安全的。
返回a concurrent.futures.future
以等待另一个os线程的结果。
此函数旨在从与运行事件循环的os线程不同的os线程调用。例:
# create a coroutine
coro = asyncio.sleep(1, result=3)
# submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# wait for the result with an optional timeout argument
assert future.result(timeout) == 3
如果在协程中引发异常,则会通知返回的future。它还可以用于取消事件循环中的任务:
try:
result = future.result(timeout)
except asyncio.timeouterror:
print('the coroutine took too long, cancelling the task...')
future.cancel()
except exception as exc:
print(f'the coroutine raised an exception: {exc!r}')
else:
print(f'the coroutine returned: {result!r}')
请参阅 文档的部分。
与其他asyncio函数不同,此函数需要 显式传递循环参数。
3.5.1版中的新功能。
9、自省
asyncio.
current_task
(loop = none )
返回当前正在运行的task
实例,或者none
没有正在运行的任务。
如果loop是none
用来获取loop。
版本3.7中的新功能。
asyncio.
all_tasks
(loop = none )
返回task
循环运行的一组尚未完成的对象。
如果loop是none
,则用于获取当前循环。
版本3.7中的新功能。
任务对象
class asyncio.
task
(coro,*,loop = none )
a future-like
object that runs a python . not thread-safe.
任务用于在事件循环中运行协同程序。如果一个协程在future上等待,则task暂停执行协程并等待future的完成。当future 完成后,包装协程的执行将恢复。
事件循环使用协作调度:事件循环一次运行一个任务。当一个task等待完成future时,事件循环运行其他任务,回调或执行io操作。
使用高级功能创建任务,或低级别或 功能。不鼓励手动实例化任务。
要取消正在运行的任务,请使用该cancel()
方法。调用它将导致task将cancellederror
异常抛出到包装的协同程序中。如果在取消期间协程正在等待future对象,则future对象将被取消。
cancelled()
可用于检查任务是否被取消。true
如果包装的协程没有抑制cancellederror
异常并且实际上被取消,则该方法返回。
asyncio.task
继承自future
其所有api,除了future.set_result()
和 future.set_exception()
。
任务支持该contextvars
模块。创建任务时,它会复制当前上下文,然后在复制的上下文中运行其协程。
版本3.7中已更改:添加了对contextvars
模块的支持。
cancel
()
请求取消任务。
这会安排cancellederror
在事件循环的下一个循环中将异常抛入包装的协程。
协程则有机会通过抑制异常与清理,甚至拒绝请求... ... ... ... ... ... 块。因此,不同于,不保证任务将被取消,尽管完全抑制取消并不常见,并且积极地不鼓励。exceptcancellederror
future.cancel()
task.cancel()
以下示例说明了协同程序如何拦截取消请求:
async def cancel_me():
print('cancel_me(): before sleep')
try:
# wait for 1 hour
await asyncio.sleep(3600)
except asyncio.cancellederror:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# create a "cancel_me" task
task = asyncio.create_task(cancel_me())
# wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.cancellederror:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
cancelled
()-
true
如果任务被取消,则返回。 - 请求取消时取消任务,
cancel()
并且包装的协同程序将cancellederror
异常传播 到其中。
done
()-
true
如果任务完成则返回。 - 一个任务完成时,包裹协程要么返回的值,引发异常,或者任务被取消。
result
()- 返回任务的结果。
- 如果任务完成,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)
- 如果已取消任务,则此方法会引发
cancellederror
异常。 - 如果task的结果尚不可用,则此方法会引发
invalidstateerror
异常。
exception
()- 返回task的例外。
- 如果包装的协同程序引发异常,则返回异常。如果包装的协程正常返回,则此方法返回
none
。 - 如果已取消任务,则此方法会引发
cancellederror
异常。 - 如果尚未完成任务,则此方法会引发
invalidstateerror
异常。
add_done_callback
(回调,*,上下文=无)- 添加要在任务完成时运行的回调。
- 此方法仅应在基于低级回调的代码中使用。
- 有关
future.add_done_callback()
详细信息,请参阅文档。
remove_done_callback
(回调)- 从回调列表中删除回调。
- 此方法仅应在基于低级回调的代码中使用。
- 有关
future.remove_done_callback()
详细信息,请参阅文档。
get_stack
(*,limit = none )- 返回此任务的堆栈帧列表。
- 如果未完成包装的协同程序,则会返回挂起它的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表。
- 帧始终从最旧到最新排序。
- 对于挂起的协程,只返回一个堆栈帧。
- 可选的limit参数设置要返回的最大帧数; 默认情况下,返回所有可用的帧。返回列表的排序取决于是返回堆栈还是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。)
print_stack
(*,limit = none,file = none )- 打印此任务的堆栈或回溯。
- 这会为检索到的帧生成类似于回溯模块的输出
get_stack()
。 - 该极限参数传递给
get_stack()
直接。 - 的文件参数是其中输出被写入的i / o流; 默认输出写入。
- classmethod
all_tasks
(loop = none ) - 返回一组事件循环的所有任务。
- 默认情况下,返回当前事件循环的所有任务。如果是loop
none
,则该函数用于获取当前循环。 - 不推荐使用此方法,将在python 3.9中删除。请改用此功能。
- classmethod
current_task
(loop = none ) -
返回当前正在运行的任务或
none
。 - 如果是loop
none
,则该函数用于获取当前循环。 - 不推荐使用此方法,将在python 3.9中删除。请改用此功能。