Skip to content

Instantly share code, notes, and snippets.

@tdamsma
Forked from damonjw/LICENSE
Created March 16, 2022 15:32
Show Gist options
  • Save tdamsma/37fe65cee356ce6c0fc00801932d044e to your computer and use it in GitHub Desktop.
Save tdamsma/37fe65cee356ce6c0fc00801932d044e to your computer and use it in GitHub Desktop.
Event driven simulator in Python, using async/await
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions.
import heapq
import asyncio
class EventSimulator(asyncio.AbstractEventLoop):
'''A simple event-driven simulator, using async/await'''
def __init__(self):
self._time = 0
self._running = False
self._immediate = []
self._scheduled = []
self._exc = None
def get_debug(self):
# A silly hangover from the way asyncio is written
return False
def time(self):
return self._time
# Methods for running the event loop.
# For a real asyncio system you need to worry a lot more about running+closed.
# For a simple simulator, we completely control the passage of time, so most of
# these functions are trivial.
#
# I've split the pending events into _immediate and _scheduled.
# I could put them all in _scheduled; but if there are lots of
# _immediate calls there's no need for them to clutter up the heap.
def run_forever(self):
self._running = True
while (self._immediate or self._scheduled) and self._running:
if self._immediate:
h = self._immediate[0]
self._immediate = self._immediate[1:]
else:
h = heapq.heappop(self._scheduled)
self._time = h._when
h._scheduled = False # just for asyncio.TimerHandle debugging?
if not h._cancelled:
h._run()
if self._exc is not None:
raise self._exc
def run_until_complete(self, future):
raise NotImplementedError
def _timer_handle_cancelled(self, handle):
# We could remove the handle from _scheduled, but instead we'll just skip
# over it in the "run_forever" method.
pass
def is_running(self):
return self._running
def is_closed(self):
return not self._running
def stop(self):
self._running = False
def close(self):
self._running = False
def shutdown_asyncgens(self):
pass
def call_exception_handler(self, context):
# If there is any exception in a callback, this method gets called.
# I'll store the exception in self._exc, so that the main simulation loop picks it up.
self._exc = context.get('exception', None)
# Methods for scheduling jobs.
#
# If the job is a coroutine, the end-user should call asyncio.ensure_future(coro()).
# The asyncio machinery will invoke loop.create_task(). Asyncio will then
# run the coroutine in pieces, breaking it apart at async/await points, and every time it
# will construct an appropriate callback and call loop.call_soon(cb).
#
# If the job is a plain function, the end-user should call one of the loop.call_*()
# methods directly.
def call_soon(self, callback, *args):
h = asyncio.Handle(callback, args, self)
self._immediate.append(h)
return h
def call_later(self, delay, callback, *args):
if delay < 0:
raise Exception("Can't schedule in the past")
return self.call_at(self._time + delay, callback, *args)
def call_at(self, when, callback, *args):
if when < self._time:
raise Exception("Can't schedule in the past")
h = asyncio.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, h)
h._scheduled = True # perhaps just for debugging in asyncio.TimerHandle?
return h
def create_task(self, coro):
# Since this is a simulator, I'll run a plain simulated-time event loop, and
# if there are any exceptions inside any coroutine I'll pass them on to the
# event loop, so it can halt and print them out. To achieve this, I need the
# exception to be caught in "async mode" i.e. by a coroutine, and then
# use self._exc to pass it on to "regular execution mode".
async def wrapper():
try:
await coro
except Exception as e:
print("Wrapped exception")
self._exc = e
return asyncio.Task(wrapper(), loop=self)
def create_future(self):
# Not sure why this is here rather than in AbstractEventLoop.
return asyncio.Future(loop=self)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions.
import asyncio, heapq
from eventsim import EventSimulator
class ProcessorSharingQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
self._work_done = 0
self._last_time = self._loop.time()
def process(self, work):
t = self._advance_clock()
fut = self._loop.create_future()
w = work / self._service_rate
heapq.heappush(self._queue, (self._work_done+w, t, fut))
if self._done:
self._done.cancel()
self._schedule()
return fut
def complete(self):
t = self._advance_clock()
(_, tstart, fut) = heapq.heappop(self._queue)
fut.set_result(t - tstart)
self._schedule()
def _advance_clock(self):
t = self._loop.time()
if self._queue:
self._work_done += (t - self._last_time) / len(self._queue)
self._last_time = t
return t
def _schedule(self):
if not self._queue:
self._done = None
else:
w,_,_ = self._queue[0]
dt = (w - self._work_done) * len(self._queue)
self._done = self._loop.call_later(dt, self.complete)
class FIFOQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
def process(self, work):
fut = self._loop.create_future()
w = work / self._service_rate
self._queue.append((w, fut))
if not self._done:
self._done = self._loop.call_later(w, self.complete)
return fut
def complete(self):
w,fut = self._queue[0]
fut.set_result(w)
self._queue = self._queue[1:]
if self._queue:
w,_ = self._queue[0]
self._done = self._loop.call_later(w, self.complete)
else:
self._done = None
#------------------------------------------------
loop = EventSimulator()
asyncio.set_event_loop(loop)
q = ProcessorSharingQueue()
async def queueing_job(i=1):
print(loop.time(), "Start job {}".format(i))
await asyncio.sleep(i)
print(loop.time(), "Sending job {}".format(i))
xmit = q.process(work=4)
await xmit
print(loop.time(), "Done job {} in time {}".format(i, xmit.result()))
asyncio.ensure_future(queueing_job(1))
asyncio.ensure_future(queueing_job(3))
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment