Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A multiprocess task broker which accepts and provides status reports for tasks using JSON REST API calls.

Multiprocess Task Broker with REST API

This gist shows and example or an asynchronous multiprocess task broker which can take job requests and report on running jobs via a minimal REST API.

Adapted from https://gist.github.com/nitaku/10d0662536f37a087e1b

All of the caveats from the original author still apply.

from multiprocessing import Pool, Manager
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import SocketServer
import json
import cgi
import random, time
# Resources to read
#
# http://stackoverflow.com/a/1239252/603280
# http://stackoverflow.com/questions/13689927/how-to-get-the-amount-of-work-left-to-be-done-by-a-python-multiprocessing-pool
#
# This is your task which will run its its own process. You can modify it to have your desired args and kwargs and
# report back to the manager dictionary 'd' or any other manager resource as necessary.
# Read the documentation for multiprocessing to find the facilities provided by multiprocess.Manager
def task(d, sessionid, number, repeatcount):
"""
This function is a stub funciton which is incrementing a success counter in a random time between 0 to 2 seconds
with a 2% chance that the failure counter will be incremented instead of the success. After each success or failure
the manager dictionary 'd' is updated with the session's progress.
You should replace this with an actual task and update necessary information about the task into the manager dict
"""
success = 0
fail = 0
while success+fail<repeatcount:
time.sleep(random.random()*2.0)
if (random.random()*100)>98.0:
fail+=1
else:
success+=1
d[sessionid] = {
'success': success,
'fail': fail,
'number': number,
'repeatcount': repeatcount
}
return
# Initializing our global resources.
p = Pool()
m = Manager()
d = m.dict()
# This is the HTTP Server which provides a simple JSON REST API
class Server(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
def do_HEAD(self):
self._set_headers()
# GET sends back the complete contents of the manager dictionary 'd' as JSON.
# This can be modified to any desired response (should be JSON)
def do_GET(self):
self._set_headers()
self.wfile.write(json.dumps(d))
# POST echoes the message adding a JSON field
def do_POST(self):
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
# refuse to receive non-json content
if ctype != 'application/json':
self.send_response(400)
self.end_headers()
return
# read the message and convert it into a python dictionary
length = int(self.headers.getheader('content-length'))
message = json.loads(self.rfile.read(length))
if message.has_key('runtask'):
"""
To run a new task simply send the following JSON as POST:
{"runtask": true, "sessionid": "ANY-UNIQUE-NAME-FOR-YOUR-TASK", 'arg1', 'repeatcount'}
Curl Syntax:
curl --data "{\"runtask\":\"true\", \"sessionid\":\"session-5\", \"number\":\"+\", \"repeatcount\": 100 }" \
--header "Content-Type: application/json" http://localhost:8111
"""
print "Starting task with %s, %s, %s" % (message['sessionid'], message['number'], message['repeatcount'])
result = p.apply_async(task, (d, message['sessionid'], message['number'], message['repeatcount']))
elif message.has_key('sessionid'):
"""
To see the status of a currently running task (or completed task) simpley POST the following JSON
{"sessionid": "THE-UNIQUE-NAME-FOR-YOUR-TASK"}
Curl Syntax:
curl --data "{\"sessionid\":\"session-5\"}" --header "Content-Type: application/json" http://localhost:8111
"""
message['status'] = d[message['sessionid']]
# send the message back
self._set_headers()
self.wfile.write(json.dumps(message))
def run(server_class=HTTPServer, handler_class=Server, port=8111):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print 'Starting httpd on port %d...' % port
httpd.serve_forever()
if __name__ == "__main__":
# Run the task broker.
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment