Skip to content

Instantly share code, notes, and snippets.

@sunilmallya
Created October 9, 2013 17:24
Show Gist options
  • Save sunilmallya/6904943 to your computer and use it in GitHub Desktop.
Save sunilmallya/6904943 to your computer and use it in GitHub Desktop.
'''
A Non blocking task scheduler and executor implemented using co routines and Tornado
Torando components acts as a non blocking listener and the 2 co routines scheduler and task exectutor
'''
import tornado.ioloop
import tornado.web
import tornado.httpserver
import tornado.gen
import time
from socket import *
from heapq import *
import random
import itertools
############################################################
## Priority Q Impl
############################################################
pq = [] # list of entries arranged in a heap
entry_finder = {} # mapping of tasks to entries
REMOVED = '<removed-task>' # placeholder for a removed task
counter = itertools.count() # unique sequence count
def add_task(task, priority=0):
'Add a new task, update if already present'
if task in entry_finder:
remove_task(task)
count = next(counter)
entry = [priority, count, task]
entry_finder[task] = entry
heappush(pq, entry)
def remove_task(task):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = entry_finder.pop(task)
entry[-1] = REMOVED
def pop_task():
'Remove and return the lowest priority task. Raise KeyError if empty.'
while pq:
priority, count, task = heappop(pq)
if task is not REMOVED:
del entry_finder[task]
return task
raise KeyError('pop from an empty priority queue')
def peek_task():
if len(pq) >0:
return pq[0]
else:
return (None,None,None)
################################################################
# CO ROUTINES
################################################################
# The Process logic, to schedule a task
def task_scheduler():
try:
while True:
task = (yield)
#Compute priority of the task
priority = time.time() + 2
add_task(task,priority)
except GeneratorExit:
print("Exit Task Scheduler")
# Check scheduler
def check_scheduler():
print "[check scheduler]"
global texecutor
priority, count, task = peek_task()
cur_time = time.time()
if priority and priority <= cur_time:
task = pop_task()
texecutor.send(task)
# Task Executor
def task_executor():
try:
while True:
task = (yield)
print "[Task exec] --> ", task, time.time()
except GeneratorExit:
print("Exit Task Executor")
tscheduler = task_scheduler()
tscheduler.next()
texecutor = task_executor()
texecutor.next()
###########################################
# Create Tornado server application
###########################################
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
class GetData(tornado.web.RequestHandler):
@tornado.web.asynchronous
def post(self,*args,**kwargs):
recv_data = self.request.body
tscheduler.send(recv_data)
self.finish()
application = tornado.web.Application([
(r"/",GetData),
])
def main():
tornado.options.parse_command_line()
server = tornado.httpserver.HTTPServer(application)
server.listen(options.port)
tornado.ioloop.PeriodicCallback(check_scheduler,1000).start()
tornado.ioloop.IOLoop.instance().start()
# ============= MAIN ======================== #
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment