Skip to content

Instantly share code, notes, and snippets.

@raylu
Last active June 21, 2016 05:55
Show Gist options
  • Save raylu/937a339c52a919fd8fda8447c8450fde to your computer and use it in GitHub Desktop.
Save raylu/937a339c52a919fd8fda8447c8450fde to your computer and use it in GitHub Desktop.
distributed job queue
Examples of distributed (job) queues:
* http://python-rq.org/
* https://github.com/twitter-archive/kestrel/wiki
* http://nsq.io/overview/quick_start.html
* http://gearman.org/
* https://kafka.apache.org/documentation.html#quickstart
* http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application
and many more inferior queues: http://queues.io/
So you want to write a distributed job queue?
Start by running queue.py and `curl localhost:8000/do_stuff -X POST`. Then, run worker.py.
Once that's working:
1. make the worker run indefinitely (by the way, wsgiref is single-threaded)
2. retry if a worker gets an exception while running a job (exponential backoff)
3. retry if a worker times out (the machine it's running on suffers network/hardware failure)
4. write a client library that doesn't involve globals()
5. provide a way to delay execution
Once you've done the easy stuff:
1. persist the queue so that it survives a server restart
2. distribute the queue servers
3. support multiple queues
4. support fanout queues ( http://www.bitmechanic.com/2011/12/30/reasons-to-use-message-queue.html )
#!/usr/bin/env python3
import collections
import wsgiref.simple_server
queue = collections.deque()
def app(environ, start_response):
method = environ['REQUEST_METHOD']
if method == 'POST':
function = environ['PATH_INFO'][1:]
queue.append(function)
response = 'enqueued %s!' % function
elif method == 'GET':
response = queue.popleft()
start_response('200 OK', [('Content-Type', 'text/plain')])
return [response.encode('utf-8')]
def main():
server = wsgiref.simple_server.make_server('0.0.0.0', 8000, app)
server.serve_forever()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import time
import urllib.request
def do_stuff():
time.sleep(5)
print('done with stuff!')
def main():
with urllib.request.urlopen('http://localhost:8000/') as r:
function = r.read().decode('utf-8')
print('got', function)
globals()[function]()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment