Skip to content

Instantly share code, notes, and snippets.

@numberoverzero
Created July 6, 2015 02:50
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save numberoverzero/1c6a8242e60873d2b801 to your computer and use it in GitHub Desktop.
Save numberoverzero/1c6a8242e60873d2b801 to your computer and use it in GitHub Desktop.
Variable that can be `await`ed on to reach a certain value, without blocking an event loop
import asyncio
missing = object()
class Value:
'''
Simple class that enables `await value.wait_for(foo)` to wait until the
variable is set to the expected value, without blocking the event loop.
Loosely modeled after asyncio.Event and asyncio.Condition
Usage:
import asyncio
loop = asyncio.new_event_loop()
signal = Value(loop=loop, value=False)
async def iterate_values(values, fut):
for value in values:
signal.value = value
print("Set signal to {}".format(value))
await asyncio.sleep(1, loop=loop)
fut.set_result(True)
async def print_when(value):
await signal.wait_for(value)
print("Signal reached desired value {}".format(value))
complete = asyncio.Future(loop=loop)
loop.create_task(print_when("foo"))
loop.create_task(print_when(3))
loop.create_task(iterate_values([1, 2, 3, "foo", 4], complete))
loop.run_until_complete(complete)
'''
def __init__(self, loop, value=None, expected=missing):
self.loop = loop
self.watchers = set()
self._value = value
self._expected = expected
@property
def value(self):
return self._value
@value.setter
def value(self, v):
self._value = v
# FOrce all `wait_for` to re-check the value
for watcher in self.watchers:
watcher.set()
async def wait_for(self, value):
'''
Yields when the Value is set to the given value.
Doesn't block the event loop with a busy `while` loop.
'''
# Don't wait if we're already at the expected value
if value == self.value:
return
watcher = asyncio.Event(loop=self.loop)
self.watchers.add(watcher)
# We'll only re-check the condition when the value changes,
# yielding back to the poller when it's not equal.
while self.value != value:
await watcher.wait()
watcher.clear()
# Clean up the watcher now that we've reached the expected value
self.watchers.remove(watcher)
async def wait(self):
'''
Waits until the Value is set to the given `expected` value.
Raises if `expected` was not set.
'''
if self._expected is missing:
raise RuntimeError(
"Must define `expected` when intializing the Value")
await self.wait_for(self._expected)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment