Skip to content

Instantly share code, notes, and snippets.

@s-c-p
Last active March 18, 2018 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s-c-p/3e7db60d3c6d710252e4d2e222e043be to your computer and use it in GitHub Desktop.
Save s-c-p/3e7db60d3c6d710252e4d2e222e043be to your computer and use it in GitHub Desktop.
# code from snarky.ca, making my own debugger cuz pdb doesn't halt time
from debugger import db # see gist named debugger.py
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again.
Comparison operators are implemented for use by heapq. Two-item
tuples unfortunately don't work because when the datetime.datetime
instances are equal, comparison falls to the coroutine and they don't
implement comparison methods, triggering an exception.
Think of this as being like asyncio.Task/curio.Task.
"""
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines.
Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
"""
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# We're ahead of schedule; wait until it's time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds.
Think of this as being like asyncio.sleep()/curio.sleep().
"""
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds.
This is what a user would typically write.
"""
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
"""Start the event loop, counting down 3 separate launches.
This is what a user would typically write.
"""
loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2),
countdown('C', 4, delay=1))
start = datetime.datetime.now()
db()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment