Skip to content

Instantly share code, notes, and snippets.

@damonjw
Last active March 23, 2024 10:28
Show Gist options
  • Star 20 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save damonjw/35aac361ca5d313ee9bf79e00261f4ea to your computer and use it in GitHub Desktop.
Save damonjw/35aac361ca5d313ee9bf79e00261f4ea to your computer and use it in GitHub Desktop.
Event driven simulator in Python, using async/await
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions.
import heapq
import asyncio
class EventSimulator(asyncio.AbstractEventLoop):
'''A simple event-driven simulator, using async/await'''
def __init__(self):
self._time = 0
self._running = False
self._immediate = []
self._scheduled = []
self._exc = None
def get_debug(self):
# A silly hangover from the way asyncio is written
return False
def time(self):
return self._time
# Methods for running the event loop.
# For a real asyncio system you need to worry a lot more about running+closed.
# For a simple simulator, we completely control the passage of time, so most of
# these functions are trivial.
#
# I've split the pending events into _immediate and _scheduled.
# I could put them all in _scheduled; but if there are lots of
# _immediate calls there's no need for them to clutter up the heap.
def run_forever(self):
self._running = True
while (self._immediate or self._scheduled) and self._running:
if self._immediate:
h = self._immediate[0]
self._immediate = self._immediate[1:]
else:
h = heapq.heappop(self._scheduled)
self._time = h._when
h._scheduled = False # just for asyncio.TimerHandle debugging?
if not h._cancelled:
h._run()
if self._exc is not None:
raise self._exc
def run_until_complete(self, future):
raise NotImplementedError
def _timer_handle_cancelled(self, handle):
# We could remove the handle from _scheduled, but instead we'll just skip
# over it in the "run_forever" method.
pass
def is_running(self):
return self._running
def is_closed(self):
return not self._running
def stop(self):
self._running = False
def close(self):
self._running = False
def shutdown_asyncgens(self):
pass
def call_exception_handler(self, context):
# If there is any exception in a callback, this method gets called.
# I'll store the exception in self._exc, so that the main simulation loop picks it up.
self._exc = context.get('exception', None)
# Methods for scheduling jobs.
#
# If the job is a coroutine, the end-user should call asyncio.ensure_future(coro()).
# The asyncio machinery will invoke loop.create_task(). Asyncio will then
# run the coroutine in pieces, breaking it apart at async/await points, and every time it
# will construct an appropriate callback and call loop.call_soon(cb).
#
# If the job is a plain function, the end-user should call one of the loop.call_*()
# methods directly.
def call_soon(self, callback, *args):
h = asyncio.Handle(callback, args, self)
self._immediate.append(h)
return h
def call_later(self, delay, callback, *args):
if delay < 0:
raise Exception("Can't schedule in the past")
return self.call_at(self._time + delay, callback, *args)
def call_at(self, when, callback, *args):
if when < self._time:
raise Exception("Can't schedule in the past")
h = asyncio.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, h)
h._scheduled = True # perhaps just for debugging in asyncio.TimerHandle?
return h
def create_task(self, coro):
# Since this is a simulator, I'll run a plain simulated-time event loop, and
# if there are any exceptions inside any coroutine I'll pass them on to the
# event loop, so it can halt and print them out. To achieve this, I need the
# exception to be caught in "async mode" i.e. by a coroutine, and then
# use self._exc to pass it on to "regular execution mode".
async def wrapper():
try:
await coro
except Exception as e:
print("Wrapped exception")
self._exc = e
return asyncio.Task(wrapper(), loop=self)
def create_future(self):
# Not sure why this is here rather than in AbstractEventLoop.
return asyncio.Future(loop=self)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions.
import asyncio, heapq
from eventsim import EventSimulator
class ProcessorSharingQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
self._work_done = 0
self._last_time = self._loop.time()
def process(self, work):
t = self._advance_clock()
fut = self._loop.create_future()
w = work / self._service_rate
heapq.heappush(self._queue, (self._work_done+w, t, fut))
if self._done:
self._done.cancel()
self._schedule()
return fut
def complete(self):
t = self._advance_clock()
(_, tstart, fut) = heapq.heappop(self._queue)
fut.set_result(t - tstart)
self._schedule()
def _advance_clock(self):
t = self._loop.time()
if self._queue:
self._work_done += (t - self._last_time) / len(self._queue)
self._last_time = t
return t
def _schedule(self):
if not self._queue:
self._done = None
else:
w,_,_ = self._queue[0]
dt = (w - self._work_done) * len(self._queue)
self._done = self._loop.call_later(dt, self.complete)
class FIFOQueue:
def __init__(self, service_rate=1, loop=None):
self._service_rate = service_rate
self._queue = []
self._loop = loop if loop else asyncio.get_event_loop()
self._done = None
def process(self, work):
fut = self._loop.create_future()
w = work / self._service_rate
self._queue.append((w, fut))
if not self._done:
self._done = self._loop.call_later(w, self.complete)
return fut
def complete(self):
w,fut = self._queue[0]
fut.set_result(w)
self._queue = self._queue[1:]
if self._queue:
w,_ = self._queue[0]
self._done = self._loop.call_later(w, self.complete)
else:
self._done = None
#------------------------------------------------
loop = EventSimulator()
asyncio.set_event_loop(loop)
q = ProcessorSharingQueue()
async def queueing_job(i=1):
print(loop.time(), "Start job {}".format(i))
await asyncio.sleep(i)
print(loop.time(), "Sending job {}".format(i))
xmit = q.process(work=4)
await xmit
print(loop.time(), "Done job {} in time {}".format(i, xmit.result()))
asyncio.ensure_future(queueing_job(1))
asyncio.ensure_future(queueing_job(3))
loop.run_forever()
@damonjw
Copy link
Author

damonjw commented Aug 3, 2017

Writing an event-driven simulator in Python, using async/await

The first file eventsim.py is a generic event loop, implementing asyncio.AbstractEventLoop. It doesn't implement any of the methods to do with actual real network access etc., it just maintains a virtual clock and a heap of events waiting to fire. Because it implements asyncio.AbstractEventLoop, all the Python 3.5 cleverness of async/await can use it automatically.

The second file sim.py is an example, using the generic event loop to simulate either a processor sharing queue or a FIFO queue.

@aisk
Copy link

aisk commented May 16, 2019

Looks like this codes dose not run on Python3.7, now Python will pass a new keyword parameter context to call_soon.

@oeway
Copy link

oeway commented Dec 12, 2020

@damonjw Thanks for the great work, I tried to use it with Pyodide and it works! A minor fix I did is to change add **kwargs to def call_soon(self, callback, *args, **kwargs):.

Would you mind to attach a license to this file so I can use it in a open source project (MIT license)?

@tdamsma
Copy link

tdamsma commented Mar 9, 2022

I stumbled upon this code whilst researching if/how I could use asyncio (with manual control of time passage) to do Discrete Event Simulations. It seems so obvious to me, but this is the first application of the idea I found. Thanks, very useful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment