协程与任务¶
本节将简述用于协程与任务的高层级 API。
协程¶
Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. For example, the following snippet of code prints "hello", waits 1 second, and then prints "world":
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
注意:简单地调用一个协程并不会使其被调度执行
>>> main()
<coroutine object main at 0x1053bb7c8>
要真正运行一个协程,asyncio 提供了三种主要机制:
asyncio.run()
函数用来运行最高层级的入口点 "main()" 函数 (参见上面的示例。)等待一个协程。以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
预期的输出:
started at 17:13:52 hello world finished at 17:13:55
asyncio.create_task()
函数用来并发运行作为 asyncio任务
的多个协程。让我们修改以上示例,并发 运行两个
say_after
协程:async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
注意,预期的输出显示代码段的运行时间比之前快了 1 秒:
started at 17:14:32 hello world finished at 17:14:34
可等待对象¶
如果一个对象可以在 await
语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待 对象有三种主要类型: 协程, 任务 和 Future.
协程
Python 协程属于 可等待 对象,因此可以在其他协程中被等待:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
任务
任务 被用来“并行的”调度协程
当一个协程通过 asyncio.create_task()
等函数被封装为一个 任务,该协程会被自动调度执行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Futures
Future
是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()
。
创建任务¶
- asyncio.create_task(coro, *, name=None, context=None)¶
将 coro 协程 封装为一个
Task
并调度其执行。返回 Task 对象。name 不为
None
,它将使用Task.set_name()
来设为任务的名称。An optional keyword-only context argument allows specifying a custom
contextvars.Context
for the coro to run in. The current context copy is created when no context is provided.该任务会在
get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发RuntimeError
。重要
Save a reference to the result of this function, to avoid a task disappearing mid execution.
3.7 新版功能.
在 3.8 版更改: Added the name parameter.
在 3.11 版更改: Added the context parameter.
休眠¶
- coroutine asyncio.sleep(delay, result=None)¶
阻塞 delay 指定的秒数。
如果指定了 result,则当协程完成时将其返回给调用者。
sleep()
总是会挂起当前任务,以允许其他任务运行。将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。
以下协程示例运行 5 秒,每秒显示一次当前日期:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
在 3.10 版更改: Removed the loop parameter.
并发运行任务¶
- awaitable asyncio.gather(*aws, return_exceptions=False)¶
并发 运行 aws 序列中的 可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
如果 return_exceptions 为
False
(默认),所引发的首个异常会立即传播给等待gather()
的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。如果 return_exceptions 为
True
,异常会和成功的结果一样处理,并聚合至结果列表。如果
gather()
被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了
CancelledError
一样处理 -- 在此情况下gather()
调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。在 3.10 版更改: Removed the loop parameter.
示例:
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({number}), currently i={i}...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): # Schedule three calls *concurrently*: L = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) print(L) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2), currently i=2... # Task B: Compute factorial(3), currently i=2... # Task C: Compute factorial(4), currently i=2... # Task A: factorial(2) = 2 # Task B: Compute factorial(3), currently i=3... # Task C: Compute factorial(4), currently i=3... # Task B: factorial(3) = 6 # Task C: Compute factorial(4), currently i=4... # Task C: factorial(4) = 24 # [2, 6, 24]
备注
如果 return_exceptions 为 False,则在 gather() 被标记为已完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用
gather.cancel()
将不会取消任何其他可等待对象。在 3.7 版更改: 如果 gather 本身被取消,则无论 return_exceptions 取值为何,消息都会被传播。
在 3.10 版更改: Removed the loop parameter.
3.10 版后已移除: 如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。
屏蔽取消操作¶
- awaitable asyncio.shield(aw)¶
-
如果 aw 是一个协程,它将自动被作为任务调度。
以下语句:
res = await shield(something())
相当于:
res = await something()
不同之处 在于如果包含它的协程被取消,在
something()
中运行的任务不会被取消。从something()
的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发CancelledError
。如果通过其他方式取消
something()
(例如在其内部操作) 则shield()
也会取消。如果希望完全忽略取消操作 (不推荐) 则
shield()
函数需要配合一个 try/except 代码段,如下所示:try: res = await shield(something()) except CancelledError: res = None
在 3.10 版更改: Removed the loop parameter.
3.10 版后已移除: 如果 aw 不是 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。
超时¶
- coroutine asyncio.wait_for(aw, timeout)¶
等待 aw 可等待对象 完成,指定 timeout 秒数后超时。
如果 aw 是一个协程,它将自动被作为任务调度。
timeout 可以为
None
,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为None
,则等待直到完成。If a timeout occurs, it cancels the task and raises
TimeoutError
.此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播。
如果等待被取消,则 aw 指定的对象也会被取消。
在 3.10 版更改: Removed the loop parameter.
示例:
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
在 3.7 版更改: When aw is cancelled due to a timeout,
wait_for
waits for aw to be cancelled. Previously, it raisedTimeoutError
immediately.在 3.10 版更改: Removed the loop parameter.
简单等待¶
- coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)¶
Run
Future
andTask
instances in the aws iterable concurrently and block until the condition specified by return_when.aws 可迭代对象必须不为空。
返回两个 Task/Future 集合:
(done, pending)
。用法:
done, pending = await asyncio.wait(aws)
如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。
Note that this function does not raise
TimeoutError
. Futures or Tasks that aren't done when the timeout occurs are simply returned in the second set.return_when 指定此函数应在何时返回。它必须为以下常数之一:
常量
描述
FIRST_COMPLETED
函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION
函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于
ALL_COMPLETED
。ALL_COMPLETED
函数将在所有可等待对象结束或取消时返回。
与
wait_for()
不同,wait()
在超时发生时不会取消可等待对象。在 3.10 版更改: Removed the loop parameter.
在 3.11 版更改: Passing coroutine objects to
wait()
directly is forbidden.
- asyncio.as_completed(aws, *, timeout=None)¶
并发地运行 aws 可迭代对象中的 可等待对象。 返回一个协程的迭代器。 所返回的每个协程可被等待以从剩余的可等待对象的可迭代对象中获得最早的下一个结果。
Raises
TimeoutError
if the timeout occurs before all Futures are done.在 3.10 版更改: Removed the loop parameter.
示例:
for coro in as_completed(aws): earliest_result = await coro # ...
在 3.10 版更改: Removed the loop parameter.
3.10 版后已移除: 如果 aws 可迭代对象中的可等待对象不全为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。
在线程中运行¶
- coroutine asyncio.to_thread(func, /, *args, **kwargs)¶
在不同的线程中异步地运行函数 func。
向此函数提供的任何 *args 和 **kwargs 会被直接传给 func。 并且,当前
contextvars.Context
会被传播,允许在不同的线程中访问来自事件循环的上下文变量。返回一个可被等待以获取 func 的最终结果的协程。
This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread. For example:
def blocking_io(): print(f"start blocking_io at {time.strftime('%X')}") # Note that time.sleep() can be replaced with any blocking # IO-bound operation, such as file operations. time.sleep(1) print(f"blocking_io complete at {time.strftime('%X')}") async def main(): print(f"started main at {time.strftime('%X')}") await asyncio.gather( asyncio.to_thread(blocking_io), asyncio.sleep(1)) print(f"finished main at {time.strftime('%X')}") asyncio.run(main()) # Expected output: # # started main at 19:50:53 # start blocking_io at 19:50:53 # blocking_io complete at 19:50:54 # finished main at 19:50:54
在任何协程中直接调用 blocking_io() 将会在调用期间阻塞事件循环,导致额外的 1 秒运行时间。 而通过改用 asyncio.to_thread(),我们可以在不同的线程中运行它从而不会阻塞事件循环。
备注
由于 GIL 的存在,asyncio.to_thread() 通常只能被用来将 IO 密集型函数变为非阻塞的。 但是,对于会释放 GIL 的扩展模块或无此限制的替代性 Python 实现来说,asyncio.to_thread() 也可被用于 CPU 密集型函数。
3.9 新版功能.
跨线程调度¶
- asyncio.run_coroutine_threadsafe(coro, loop)¶
向指定事件循环提交一个协程。(线程安全)
返回一个
concurrent.futures.Future
以等待来自其他 OS 线程的结果。此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在协程内产生了异常,将会通知返回的 Future 对象。它也可被用来取消事件循环中的任务:
try: result = future.result(timeout) except TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
参见 concurrency and multithreading 部分的文档。
不同与其他 asyncio 函数,此函数要求显式地传入 loop 参数。
3.5.1 新版功能.
内省¶
- asyncio.current_task(loop=None)¶
返回当前运行的
Task
实例,如果没有正在运行的任务则返回None
。如果 loop 为
None
则会使用get_running_loop()
获取当前事件循环。3.7 新版功能.
- asyncio.all_tasks(loop=None)¶
返回事件循环所运行的未完成的
Task
对象的集合。如果 loop 为
None
,则会使用get_running_loop()
获取当前事件循环。3.7 新版功能.
Task 对象¶
- class asyncio.Task(coro, *, loop=None, name=None)¶
一个与
Future 类似
的对象,可运行 Python 协程。非线程安全。Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。
事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。
使用高层级的
asyncio.create_task()
函数来创建 Task 对象,也可用低层级的loop.create_task()
或ensure_future()
函数。不建议手动实例化 Task 对象。要取消一个正在运行的 Task 对象可使用
cancel()
方法。调用此方法将使该 Task 对象抛出一个CancelledError
异常给打包的协程。如果取消期间一个协程正在等待一个 Future 对象,该 Future 对象也将被取消。cancelled()
可被用来检测 Task 对象是否被取消。如果打包的协程没有抑制CancelledError
异常并且确实被取消,该方法将返回True
。asyncio.Task
从Future
继承了其除Future.set_result()
和Future.set_exception()
以外的所有 API。Task 对象支持
contextvars
模块。当一个 Task 对象被创建,它将复制当前上下文,然后在复制的上下文中运行其协程。在 3.7 版更改: 加入对
contextvars
模块的支持。在 3.8 版更改: Added the name parameter.
3.10 版后已移除: 如果未指定 loop 并且没有正在运行的事件循环则会发出弃用警告。
- cancel(msg=None)¶
请求取消 Task 对象。
这将安排在下一轮事件循环中抛出一个
CancelledError
异常给被封包的协程。协程在之后有机会进行清理甚至使用
try
... ...except CancelledError
...finally
代码块抑制异常来拒绝请求。不同于Future.cancel()
,Task.cancel()
不保证 Task 会被取消,虽然抑制完全取消并不常见,也很不鼓励这样做。在 3.9 版更改: Added the msg parameter.
Deprecated since version 3.11, will be removed in version 3.14: msg parameter is ambiguous when multiple
cancel()
are called with different cancellation messages. The argument will be removed.以下示例演示了协程是如何侦听取消请求的:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
- cancelled()¶
如果 Task 对象 被取消 则返回
True
。当使用
cancel()
发出取消请求时 Task 会被 取消,其封包的协程将传播被抛入的CancelledError
异常。
- done()¶
如果 Task 对象 已完成 则返回
True
。当 Task 所封包的协程返回一个值、引发一个异常或 Task 本身被取消时,则会被认为 已完成。
- result()¶
返回 Task 的结果。
如果 Task 对象 已完成,其封包的协程的结果会被返回 (或者当协程引发异常时,该异常会被重新引发。)
如果 Task 对象 被取消,此方法会引发一个
CancelledError
异常。如果 Task 对象的结果还不可用,此方法会引发一个
InvalidStateError
异常。
- exception()¶
返回 Task 对象的异常。
如果所封包的协程引发了一个异常,该异常将被返回。如果所封包的协程正常返回则该方法将返回
None
。如果 Task 对象 被取消,此方法会引发一个
CancelledError
异常。如果 Task 对象尚未 完成,此方法将引发一个
InvalidStateError
异常。
- add_done_callback(callback, *, context=None)¶
添加一个回调,将在 Task 对象 完成 时被运行。
此方法应该仅在低层级的基于回调的代码中使用。
要了解更多细节请查看
Future.add_done_callback()
的文档。
- remove_done_callback(callback)¶
从回调列表中移除 callback 。
此方法应该仅在低层级的基于回调的代码中使用。
要了解更多细节请查看
Future.remove_done_callback()
的文档。
- get_stack(*, limit=None)¶
返回此 Task 对象的栈框架列表。
如果所封包的协程未完成,这将返回其挂起所在的栈。如果协程已成功完成或被取消,这将返回一个空列表。如果协程被一个异常终止,这将返回回溯框架列表。
框架总是从按从旧到新排序。
每个被挂起的协程只返回一个栈框架。
可选的 limit 参数指定返回框架的数量上限;默认返回所有框架。返回列表的顺序要看是返回一个栈还是一个回溯:栈返回最新的框架,回溯返回最旧的框架。(这与 traceback 模块的行为保持一致。)
- print_stack(*, limit=None, file=None)¶
打印此 Task 对象的栈或回溯。
此方法产生的输出类似于 traceback 模块通过
get_stack()
所获取的框架。limit 参数会直接传递给
get_stack()
。file 参数是输出所写入的 I/O 流;默认情况下输出会写入
sys.stderr
。
- get_name()¶
返回 Task 的名称。
如果没有一个 Task 名称被显式地赋值,默认的 asyncio Task 实现会在实例化期间生成一个默认名称。
3.8 新版功能.