Skip to content

Instantly share code, notes, and snippets.

@basak
Last active December 30, 2015 01:05
Show Gist options
  • Save basak/c0a8f4f5a51d7fce1cc7 to your computer and use it in GitHub Desktop.
Save basak/c0a8f4f5a51d7fce1cc7 to your computer and use it in GitHub Desktop.
asyncio event and worker pattern thoughts
import asyncio
class Worker:
'''A Worker has background asyncio tasks that are cancelled on __del__'''
def __init__(self, loop=None):
self._loop = loop if loop else asyncio.get_event_loop()
# This set serves to keep incomplete tasks alive as long as this class
# instance exists.
self._tasks = set()
def __del__(self):
# As we cancel tasks, they may disappear from the set which
# could invalidate the iteration, so take a copy first.
tasks = set(self._tasks)
for task in tasks:
task.cancel()
@asyncio.coroutine
def _wait_coroutine(self, coroutine):
'''Wait for coroutine to finish and then clean up'''
yield from coroutine
self._tasks.remove(asyncio.Task.current_task())
def create_task(self, coroutine):
task = self._loop.create_task(self._wait_coroutine(coroutine))
self._tasks.add(task)
return task
class _FutureLinkedListItem:
def __init__(self):
self.future = asyncio.Future()
self.next = None
class EventQueue:
def __init__(self):
self._next_future_item = _FutureLinkedListItem()
# XXX This is a hack. Is there a better way?
# In KeyedEventQueue below, an underlying EventQueue might disappear if
# a caller holds on to a future generated by the EventQueue but not to
# the EventQueue itself. Then an incoming event will fail to find the
# EventQueue, create a new one, and then the original future will never
# fire. So as a workaround make sure the future holds on to a reference
# to the EventQueue itself, so that the EventQueue is not destroyed
# before the future.
self._next_future_item.future.__keep = self
def __iter__(self):
return _EventQueueReader(self)
def _get_next_future_item(self, item):
if item.next is None:
item.next = _FutureLinkedListItem()
# XXX same hack as in class constructor
item.next.future.__keep = self
return item.next
def _move_along(self):
self._next_future_item = self._get_next_future_item(self._next_future_item)
def send_result(self, result):
self._next_future_item.future.set_result(result)
self._move_along()
def send_exception(self, exception):
self._next_future_item.future.set_exception(exception)
self._move_along()
class _EventQueueReader:
def __init__(self, parent):
self.parent = parent
self._next_future_item = parent._next_future_item
def _move_along(self):
self._next_future_item = self.parent._get_next_future_item(self._next_future_item)
def __iter__(self):
return self
def __next__(self):
future = self._next_future_item.future
self._move_along()
return future
class KeyedEventQueue:
def __init__(self, parent, key_func, data_func=lambda x:x):
self.parent = parent
self.key_func = key_func
self.data_func = data_func
self._queues = weakref.WeakValueDictionary()
self._worker = Worker()
self._worker.create_task(self._start())
@asyncio.coroutine
def _start(self):
for future in self.parent:
try:
result = yield from future
except:
# Swallow exceptions. Keyed queue readers can never see them
# by definition, since we don't know how to filter them.
# Listeners interested in exception events need to watch the
# master queue.
pass
data_key = self.key_func(result)
data_value = self.data_func(result)
self[data_key].send_result(data_value)
def __getitem__(self, k):
try:
return self._queues[k]
except KeyError:
new_queue = EventQueue()
self._queues[k] = new_queue
return new_queue
#!/usr/bin/python3
import asyncio
import functools
import subprocess
import time
import util
class PowerStateWatcher:
def __init__(self, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.power_events = util.EventQueue()
self._worker = util.Worker(loop=loop)
self._worker.create_task(self.watch_wakeups())
@asyncio.coroutine
def watch_wakeups(self):
create = asyncio.create_subprocess_exec('stdbuf', '-o0', 'powerd-cli', 'listen', stdout=subprocess.PIPE)
proc = yield from create
try:
while True:
line = yield from proc.stdout.readline()
if line == b"Received SysPowerStateChange: state=0\n":
self.power_events.send_result(0)
elif line == b"Received SysPowerStateChange: state=1\n":
self.power_events.send_result(1)
finally:
proc.kill()
class Clock:
def __init__(self, period, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.trigger_events = util.EventQueue()
self._worker = util.Worker(loop=loop)
self._worker.create_task(self._watch(period))
@asyncio.coroutine
def _watch(self, period):
watcher = PowerStateWatcher(self.loop)
power_events = iter(watcher.power_events)
future_power_event = next(power_events)
deadline = time.time() + period
while True:
time_remaining = deadline - time.time()
if time_remaining < 0:
self.trigger_events.send_result(-time_remaining)
deadline = time.time() + period
else:
yield from asyncio.wait(
[future_power_event],
timeout=time_remaining,
loop=self.loop,
)
if future_power_event.done():
future_power_event = next(power_events)
@asyncio.coroutine
def create_periodic_reports(loop):
clock = Clock(period=300, loop=loop)
for future_trigger in clock.trigger_events:
tardiness = yield from future_trigger
# DO SOMETHING HERE
def main():
loop = asyncio.get_event_loop()
loop.create_task(create_periodic_reports(loop))
loop.run_forever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment