同步原语¶
源代码: Lib/asyncio/locks.py
asyncio 同步原语被设计为与 threading
模块的类似,但有两个关键注意事项:
asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用
threading
);这些同步原语的方法不接受 timeout 参数;请使用
asyncio.wait_for()
函数来执行带有超时的操作。
asyncio 具有下列基本同步原语:
Lock¶
- class asyncio.Lock¶
实现一个用于 asyncio 任务的互斥锁。 非线程安全。
asyncio 锁可被用来保证对共享资源的独占访问。
使用 Lock 的推荐方式是通过
async with
语句:lock = asyncio.Lock() # ... later async with lock: # access shared state
这等价于:
lock = asyncio.Lock() # ... later await lock.acquire() try: # access shared state finally: lock.release()
在 3.10 版更改: Removed the loop parameter.
- coroutine acquire()¶
获取锁。
此方法会等待直至锁为 unlocked,将其设为 locked 并返回
True
。当有一个以上的协程在
acquire()
中被阻塞则会等待解锁,最终只有一个协程会被执行。锁的获取是 公平的: 被执行的协程将是第一个开始等待锁的协程。
- release()¶
释放锁。
当锁为 locked 时,将其设为 unlocked 并返回。
如果锁为 unlocked,则会引发
RuntimeError
。
- locked()¶
如果锁为 locked 则返回
True
。
事件¶
- class asyncio.Event¶
事件对象。 该对象不是线程安全的。
asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。
Event 对象会管理一个内部旗标,可通过
set()
方法将其设为 true 并通过clear()
方法将其重设为 false。wait()
方法会阻塞直至该旗标被设为 true。 该旗标初始时会被设为 false。在 3.10 版更改: Removed the loop parameter.
示例:
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- set()¶
设置事件。
所有等待事件被设置的任务将被立即唤醒。
- is_set()¶
如果事件已被设置则返回
True
。
Condition¶
- class asyncio.Condition(lock=None)¶
条件对象。 该对象不是线程安全的。
asyncio 条件原语可被任务用于等待某个事件发生,然后获取对共享资源的独占访问。
在本质上,Condition 对象合并了
Event
和Lock
的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访问。可选的 lock 参数必须为
Lock
对象或None
。 在后一种情况下会自动创建一个新的 Lock 对象。在 3.10 版更改: Removed the loop parameter.
使用 Condition 的推荐方式是通过
async with
语句:cond = asyncio.Condition() # ... later async with cond: await cond.wait()
这等价于:
cond = asyncio.Condition() # ... later await cond.acquire() try: await cond.wait() finally: cond.release()
- coroutine acquire()¶
获取下层的锁。
此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 returns
True
。
- notify(n=1)¶
唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。 如果没有任务正在等待则此方法为空操作。
锁必须在此方法被调用前被获取并在随后被快速释放。 如果通过一个 unlocked 锁调用则会引发
RuntimeError
。
- locked()¶
如果下层的锁已被获取则返回
True
。
- notify_all()¶
唤醒所有正在等待此条件的任务。
此方法的行为类似于
notify()
,但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在随后被快速释放。 如果通过一个 unlocked 锁调用则会引发
RuntimeError
。
- release()¶
释放下层的锁。
当在未锁定的锁上发起调用时,会引发
RuntimeError
。
- coroutine wait()¶
等待直至收到通知。
当此方法被调用时如果调用方任务未获得锁,则会引发
RuntimeError
。这个方法会释放下层的锁,然后保持阻塞直到被
notify()
或notify_all()
调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此方法将返回True
。
- coroutine wait_for(predicate)¶
等待直到目标值变为 true。
目标必须为一个可调用对象,其结果将被解读为一个布尔值。 最终的值将为返回值。
Semaphore¶
- class asyncio.Semaphore(value=1)¶
信号量对象。 该对象不是线程安全的。
信号量会管理一个内部计数器,该计数器会随每次
acquire()
调用递减并随每次release()
调用递增。 计数器的值永远不会降到零以下;当acquire()
发现其值为零时,它将保持阻塞直到有某个任务调用了release()
。可选的 value 参数用来为内部计数器赋初始值 (默认值为
1
)。 如果给定的值小于0
则会引发ValueError
。在 3.10 版更改: Removed the loop parameter.
使用 Semaphore 的推荐方式是通过
async with
语句。:sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
这等价于:
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
- locked()¶
如果信号量对象无法被立即获取则返回
True
。
- release()¶
释放一个信号量对象,将内部计数器的值加一。 可以唤醒一个正在等待获取信号量对象的任务。
不同于
BoundedSemaphore
,Semaphore
允许执行的release()
调用多于acquire()
调用。
BoundedSemaphore¶
- class asyncio.BoundedSemaphore(value=1)¶
绑定的信号量对象。 该对象不是线程安全的。
BoundedSemaphore 是特殊版本的
Semaphore
,如果在release()
中内部计数器值增加到初始 value 以上它将引发一个ValueError
。在 3.10 版更改: Removed the loop parameter.
Barrier¶
- class asyncio.Barrier(parties, action=None)¶
A barrier object. Not thread-safe.
A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the
wait()
method and would be blocked until the specified number of tasks end up waiting onwait()
. At that point all of the waiting tasks would unblock simultaneously.async with
can be used as an alternative to awaiting onwait()
.The barrier can be reused any number of times.
示例:
async def example_barrier(): # barrier with 3 parties b = asyncio.Barrier(3) # create 2 new waiting tasks asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # The third .wait() call passes the barrier await b.wait() print(b) print("barrier passed") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
Result of this example is:
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]> <asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]> barrier passed <asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
3.11 新版功能.
- coroutine wait()¶
Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.
When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is "filling", the number of waiting task decreases by 1.
The return value is an integer in the range of 0 to
parties-1
, different for each task. This can be used to select a task to do some special housekeeping, e.g.:... async with barrier as position: if position == 0: # Only one task print this print('End of *draining phasis*')
This method may raise a
BrokenBarrierError
exception if the barrier is broken or reset while a task is waiting. It could raise aCancelledError
if a task is cancelled.
- coroutine reset()¶
Return the barrier to the default, empty state. Any tasks waiting on it will receive the
BrokenBarrierError
exception.If a barrier is broken it may be better to just leave it and create a new one.
- coroutine abort()¶
Put the barrier into a broken state. This causes any active or future calls to
wait()
to fail with theBrokenBarrierError
. Use this for example if one of the taks needs to abort, to avoid infinite waiting tasks.
- parties¶
The number of tasks required to pass the barrier.
- n_waiting¶
The number of tasks currently waiting in the barrier while filling.
- broken¶
A boolean that is
True
if the barrier is in the broken state.
- exception asyncio.BrokenBarrierError¶
This exception, a subclass of
RuntimeError
, is raised when theBarrier
object is reset or broken.
在 3.9 版更改: 使用 await lock
或 yield from lock
以及/或者 with
语句 (with await lock
, with (yield from lock)
) 来获取锁的操作已被移除。 请改用 async with lock
。