Skip to content

Instantly share code, notes, and snippets.

@agoose77
Last active April 9, 2020 18:28
Show Gist options
  • Save agoose77/ae540c614d165c11aec58872178eb8dd to your computer and use it in GitHub Desktop.
Save agoose77/ae540c614d165c11aec58872178eb8dd to your computer and use it in GitHub Desktop.
from collections import deque, namedtuple
from time import time
from enum import auto, Enum
from operator import itemgetter
class Trap(Enum):
SLEEP = auto()
SPAWN = auto()
WHOAMI = auto()
CANCEL = auto()
class TaskCancelled(Exception):
pass
class Task:
def __init__(self, coro):
self.coro = coro
self.next_exc = None
self.next_result = None
def cancel(self):
self.next_exc = TaskCancelled
def update(self):
if self.next_exc is not None:
return self.coro.throw(self.next_exc)
return self.coro.send(self.next_result)
def sleep(dt=0.0):
yield Trap.SLEEP, dt
def whoami():
return (yield Trap.WHOAMI, )
def spawn_task(coro, delay=None):
return (yield Trap.SPAWN, coro, delay)
def cancel(task):
return (yield Trap.CANCEL, task)
def child():
while True:
print("I'm a child")
yield from sleep(0.0) # Makes this a generator
def example():
yield from sleep(1.5)
me = yield from whoami()
print("Now awake", me)
new_task = yield from spawn_task(child())
print("Cancelling child!", new_task)
yield from cancel(new_task)
print("Done!")
_get_wake_time = itemgetter(0)
class Loop:
def __init__(self):
self._active_tasks = deque()
self._tasks = []
self._sleeping_tasks = []
def create_task(self, coro, delay=None):
task = Task(coro)
self._tasks.append(task)
if delay is None:
self._active_tasks.append(task)
else:
wake_time = self._now() + delay
self._sleep_task(task, wake_time)
return task
def run_until_complete(self, *initial_coros):
for coro in initial_coros:
self.create_task(coro)
while self._tasks:
self.step()
def step(self):
now = self._now()
# Wake tasks
while self._sleeping_tasks:
wake_time, task = self._sleeping_tasks[0]
if wake_time > now:
break
del self._sleeping_tasks[0]
self._active_tasks.append(task)
for i in range(len(self._active_tasks)):
task = self._active_tasks.popleft()
try:
result = task.update()
except (StopIteration, TaskCancelled):
self._tasks.remove(task)
continue
if isinstance(result, tuple):
trap, *args = result
if trap == Trap.SLEEP:
delay, = args
wake_time = self._now() + delay
self._sleep_task(task, wake_time)
continue
elif trap == Trap.SPAWN:
coro, delay = args
child_task = self.create_task(coro, delay)
task.next_result = child_task
elif trap == Trap.WHOAMI:
task.next_result = task
elif trap == Trap.CANCEL:
target_task, = args
target_task.cancel()
self._active_tasks.append(task)
def _sleep_task(self, task, wake_time):
self._sleeping_tasks.append((wake_time, task))
self._sleeping_tasks.sort(key=_get_wake_time)
def _now(self):
return time()
Loop().run_until_complete(example())
print("Finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment