Skip to content

Instantly share code, notes, and snippets.

@Mattosx
Forked from damonjw/LICENSE
Created September 17, 2018 07:13
Show Gist options
  • Save Mattosx/b0c5baa36c8d0115e40bbc316f6eece2 to your computer and use it in GitHub Desktop.
Save Mattosx/b0c5baa36c8d0115e40bbc316f6eece2 to your computer and use it in GitHub Desktop.
Event driven simulator in Python, using async/await
import heapq
import asyncio
class EventSimulator(asyncio.AbstractEventLoop):
'''A simple event-driven simulator, using async/await'''
def __init__(self):
self._time = 0
self._running = False
self._immediate = []
self._scheduled = []
self._exc = None
def get_debug(self):
# A silly hangover from the way asyncio is written
return False
def time(self):
return self._time
# Methods for running the event loop.
# For a real asyncio system you need to worry a lot more about running+closed.
# For a simple simulator, we completely control the passage of time, so most of
# these functions are trivial.
#
# I've split the pending events into _immediate and _scheduled.
# I could put them all in _scheduled; but if there are lots of
# _immediate calls there's no need for them to clutter up the heap.
def run_forever(self):
self._running = True
while (self._immediate or self._scheduled) and self._running:
if self._immediate:
h = self._immediate[0]
self._immediate = self._immediate[1:]
else:
h = heapq.heappop(self._scheduled)
self._time = h._when
h._scheduled = False # just for asyncio.TimerHandle debugging?
if not h._cancelled:
h._run()
if self._exc is not None:
raise self._exc
def run_until_complete(self, future):
raise NotImplementedError
def _timer_handle_cancelled(self, handle):
# We could remove the handle from _scheduled, but instead we'll just skip
# over it in the "run_forever" method.
pass
def is_running(self):
return self._running
def is_closed(self):
return not self._running
def stop(self):
self._running = False
def close(self):
self._running = False
def shutdown_asyncgens(self):
pass
def call_exception_handler(self, context):
# If there is any exception in a callback, this method gets called.
# I'll store the exception in self._exc, so that the main simulation loop picks it up.
self._exc = context.get('exception', None)
# Methods for scheduling jobs.
#
# If the job is a coroutine, the end-user should call asyncio.ensure_future(coro()).
# The asyncio machinery will invoke loop.create_task(). Asyncio will then
# run the coroutine in pieces, breaking it apart at async/await points, and every time it
# will construct an appropriate callback and call loop.call_soon(cb).
#
# If the job is a plain function, the end-user should call one of the loop.call_*()
# methods directly.
def call_soon(self, callback, *args):
h = asyncio.Handle(callback, args, self)
self._immediate.append(h)
return h
def call_later(self, delay, callback, *args):
if delay < 0:
raise Exception("Can't schedule in the past")
return self.call_at(self._time + delay, callback, *args)
def call_at(self, when, callback, *args):
if when < self._time:
raise Exception("Can't schedule in the past")
h = asyncio.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, h)
h._scheduled = True # perhaps just for debugging in asyncio.TimerHandle?
return h
def create_task(self, coro):
# Since this is a simulator, I'll run a plain simulated-time event loop, and
# if there are any exceptions inside any coroutine I'll pass them on to the
# event loop, so it can halt and print them out. To achieve this, I need the
# exception to be caught in "async mode" i.e. by a coroutine, and then
# use self._exc to pass it on to "regular execution mode".
async def wrapper():
try:
await coro
except Exception as e:
print("Wrapped exception")
self._exc = e
return asyncio.Task(wrapper(), loop=self)
def create_future(self):
# Not sure why this is here rather than in AbstractEventLoop.
return asyncio.Future(loop=self)
import asyncio, heapq
from eventsim import EventSimulator
class ProcessorSharingQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
self._work_done = 0
self._last_time = self._loop.time()
def process(self, work):
t = self._advance_clock()
fut = self._loop.create_future()
w = work / self._service_rate
heapq.heappush(self._queue, (self._work_done+w, t, fut))
if self._done:
self._done.cancel()
self._schedule()
return fut
def complete(self):
t = self._advance_clock()
(_, tstart, fut) = heapq.heappop(self._queue)
fut.set_result(t - tstart)
self._schedule()
def _advance_clock(self):
t = self._loop.time()
if self._queue:
self._work_done += (t - self._last_time) / len(self._queue)
self._last_time = t
return t
def _schedule(self):
if not self._queue:
self._done = None
else:
w,_,_ = self._queue[0]
dt = (w - self._work_done) * len(self._queue)
self._done = self._loop.call_later(dt, self.complete)
class FIFOQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
def process(self, work):
fut = self._loop.create_future()
w = work / self._service_rate
self._queue.append((w, fut))
if not self._done:
self._done = self._loop.call_later(w, self.complete)
return fut
def complete(self):
w,fut = self._queue[0]
fut.set_result(w)
self._queue = self._queue[1:]
if self._queue:
w,_ = self._queue[0]
self._done = self._loop.call_later(w, self.complete)
else:
self._done = None
#------------------------------------------------
loop = EventSimulator()
asyncio.set_event_loop(loop)
q = ProcessorSharingQueue()
async def queueing_job(i=1):
print(loop.time(), "Start job {}".format(i))
await asyncio.sleep(i)
print(loop.time(), "Sending job {}".format(i))
xmit = q.process(work=4)
await xmit
print(loop.time(), "Done job {} in time {}".format(i, xmit.result()))
asyncio.ensure_future(queueing_job(1))
asyncio.ensure_future(queueing_job(3))
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment