Skip to content

Instantly share code, notes, and snippets.

@aryan-f
Created January 26, 2021 08:21
Show Gist options
  • Save aryan-f/ec8a5bb52af62f8c9fc2280d8c1b1c1f to your computer and use it in GitHub Desktop.
Save aryan-f/ec8a5bb52af62f8c9fc2280d8c1b1c1f to your computer and use it in GitHub Desktop.
Python AsyncIO Synchronization Primitive, similar to Event, but one call to set() lets the task waiting on it to run only once. I implemented this for my own program to trigger several tasks from a common dependency.
import asyncio
import collections
class Trigger:
"""Asynchronous synchronization primitive for triggering groups of tasks.
The class borrows much of its implementation from `asyncio.Event`. However, it gets rid of the internal flag, so
that whenever another coroutine waits on it, it can only be awakened with an explicit "new" call to `set()`. If
another waits on the task in a loop, one call to `set()` let's it run for a single iteration.
"""
def __init__(self):
self._waiters = collections.deque()
self._loop = asyncio.events.get_event_loop()
def __repr__(self):
res = super().__repr__()
extra = f' [waiters: {len(self._waiters)}]' if self._waiters else ''
return f'<{res[1:-1]}' + extra
def set(self):
"""
All coroutines waiting for the trigger will be awakened, but a subsequent `wait()` on the coroutine will require
another `set()` to unlock.
"""
for future in self._waiters:
if not future.done():
future.set_result(True)
async def wait(self):
"""
Blocks until the trigger is set. Once it is set, it will return `True`.
"""
future = self._loop.create_future()
self._waiters.append(future)
try:
await future
return True
finally:
self._waiters.remove(future)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment