Skip to content

Instantly share code, notes, and snippets.

@nfaggian
Last active August 10, 2016 12:21
Show Gist options
  • Save nfaggian/b204f48fe1bc7868b791ba0b2ba092fb to your computer and use it in GitHub Desktop.
Save nfaggian/b204f48fe1bc7868b791ba0b2ba092fb to your computer and use it in GitHub Desktop.
Simple remote simulator executor
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
"""
Copyright Bureau Of Meteorology.
This software is provided under license "as is", without warranty of any
kind including, but not limited to, fitness for a particular purpose. The
user assumes the entire risk as to the use and performance of the software.
In no event shall the copyright holder be held liable for any claim, damages
or other liability arising from the use of the software.
"""
import json
import tornado.ioloop
import tornado.web
import tornado.escape
import tornado.queues
import tornado.gen
import concurrent
import time
import uuid
# A simple mechanism to run and manage jobs, default is the number of cores.
# https://docs.python.org/3.2/library/concurrent.futures.html#module-concurrent.futures
EXECUTOR = concurrent.futures.ProcessPoolExecutor(max_workers=2)
# Dictionary that tracks futures
SIMULATIONS = {}
def future_state(future):
"""
String represetation of state
"""
if future.cancelled():
return 'cancel'
elif future.running():
return 'running'
elif future.done():
return 'finished'
else:
return 'waiting'
def simulator_runner(specification):
"""
Cache simulator output in RAM - bad idea generally.
"""
# Call an external process that generates output
time.sleep(30)
# Capture the output
return json.loads(open('data.json', 'r').read())
class JSONHandler(tornado.web.RequestHandler):
"""
Request handler where requests and responses speak JSON.
derived from: https://gist.github.com/mminer/5464753
"""
def prepare(self):
if self.request.headers["Content-Type"].startswith("application/json"):
self.json_args = tornado.escape.json_decode(self.request.body)
else:
self.json_args = None
class SubmitHandler(JSONHandler):
"""
Recieves a simulation specification and queues a job.
"""
@tornado.gen.coroutine
def post(self):
specification = self.json_args
identifier = uuid.uuid1().__str__()
SIMULATIONS[ identifier ] = EXECUTOR.submit(simulator_runner, specification)
self.write(json.dumps({"job-id":identifier}))
class StatusHandler(tornado.web.RequestHandler):
"""
Returns the status of a job.
"""
@tornado.gen.coroutine
def get(self):
self.write(dict([(k, future_state(v))
for k, v in SIMULATIONS.items()]))
class CancelHandler(JSONHandler):
"""
Cancels a job, removes it from the job queue.
"""
@tornado.gen.coroutine
def post(self):
data = self.json_args
try:
if data["job-id"] in SIMULATIONS:
future = SIMULATIONS[data["job-id"]]
future.cancel()
self.write({"job-id":data["job-id"],
"status":future_state(future)})
else:
self.write({"error":"invalid-job"})
except Exception as error:
self.write({"error": error.__str__()})
class RetrieveHandler(JSONHandler):
"""
Retrieves a completed job.
"""
@tornado.gen.coroutine
def post(self):
data = self.json_args
try:
if data["job-id"] in SIMULATIONS:
future = SIMULATIONS[data["job-id"]]
# pull data from job
self.write(future.result())
else:
self.write({"error":"invalid-job"})
except Exception as error:
self.write({"error": error.__str__()})
if __name__ == "__main__":
app = tornado.web.Application([
(r"/submit", SubmitHandler),
(r"/status", StatusHandler),
(r"/cancel", CancelHandler),
(r"/retrieve", RetrieveHandler),
], debug=True)
app.listen(9999)
tornado.ioloop.IOLoop.current().start()
@nfaggian
Copy link
Author

nfaggian commented Aug 10, 2016

Basic tornado based job scheduler using concurrent futures.

  • submit submits a job to be executed. The payload is a json file, which could be the parameterization for the simulator. Submit returns a job-id.
  • status returns a dictionary that shows the status of submitted jobs.
  • cancel cancels a running or submitted job. The payload is a json file with a job-id specified.
  • retrieve pulls the data from a finished job. The payload is a geo-json file, an example is the data.json file attached.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment