Skip to content

Instantly share code, notes, and snippets.

@soravux
Last active December 20, 2015 06:08
Show Gist options
  • Save soravux/6083363 to your computer and use it in GitHub Desktop.
Save soravux/6083363 to your computer and use it in GitHub Desktop.
Proof of concept asynchronous scheduler in Python
import time
import threading
import sched
import signal
class myAsyncSched(object):
"""Asynchronous scheduler"""
def __init__(self):
self.s = sched.scheduler(time.time, time.sleep)
self.e = threading.Event()
self._stop = threading.Event()
self.start = time.time()
def addTask(self, delay, task, *args):
"""Add a new task to the scheduler."""
self.s.enter(delay, 1, task, argument=args, kwargs={'start': self.start})
self.e.set()
def stop(self):
"""Stop the scheduler."""
self._stop.set()
self.e.set()
def rmTask(self, arg):
"""Remove a task from the scheduler."""
for event in self.s.queue:
if event.argument[0] == arg:
self.s.cancel(event)
def run(self):
"""Executes the main loop of the scheduler.
This is to be executed in a new thread."""
max_wait = None
while not self._stop.is_set():
self.e.wait(max_wait)
self.e.clear()
max_wait = self.s.run(blocking=False)
def monPrint(inData, start):
"""Fonction a effectuer"""
print(int(time.time() - start), inData)
if __name__ == '__main__':
a = myAsyncSched()
loop_thread = threading.Thread(target=a.run)
loop_thread.daemon = True
loop_thread.start()
# temps, fonction, chose a afficher (argument)
a.addTask(2, monPrint, "1")
a.addTask(4, monPrint, "2")
a.addTask(8, monPrint, "3")
a.addTask(1, monPrint, "0.5")
a.rmTask("2")
time.sleep(5)
a.addTask(0, monPrint, "2.5") # Ajouter un element avant le dernier
a.rmTask("3")
time.sleep(5)
a.stop() # Stop after
loop_thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment