Skip to content

Instantly share code, notes, and snippets.

@tapanpandita
Created May 6, 2012 16:54
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save tapanpandita/2623259 to your computer and use it in GitHub Desktop.
Save tapanpandita/2623259 to your computer and use it in GitHub Desktop.
Task queuing in Django with ZeroMQ

Task queuing in Django with ZeroMQ

Introduction

Here at Glitterbug Tech, Django is the lifeline of everything we make and we are big fans! Django doesn't get in the way and lets us write the language we love, python, while providing an amazing set of tools that makes creating web applications fast and fun. But, as you know, code is executed synchronously before you can send back a response. Which means when you are generating that report from your database which has a million rows, your client has to wait for a response while your application huffs and puffs trying to get everything ready (Unless he times out, in which case you receive angry calls while you try to explain what "502 Bad Gateway" means). Sometimes you just want to make your site more responsive by pushing time consuming tasks in the background ("Your comment has been posted!" while a zmq process works in the backend adding it to your db/caching layer/pushing it to followers/creating rainbows). What you need here is some of that asynchrounous-non-blocking secret sauce. No, I don't mean NodeJs, I mean ZeroMQ!

###What is ZeroMQ?

ZeroMq is a high-performance asynchronous messaging library aimed at use in scalable distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ØMQ system can run without a dedicated message broker. The library is designed to have a familiar socket-style API.

ZeroMQ provides you with a much lower level api than any of the other queuing systems. The basic ZeroMQ unit is a ZeroMQ socket which according the zguide is:

A ZeroMQ socket is what you get when you take a normal TCP socket, inject it with a mix of radioactive isotopes stolen from a secret Soviet atomic research project, bombard it with 1950-era cosmic rays, and put it into the hands of a drug-addled comic book author with a badly-disguised fetish for bulging muscles clad in spandex. Yes, ZeroMQ sockets are the world-saving superheroes of the networking world.

ZeroMQ sockets provide a socket-like api while doing a lot of heavy lifting under the hood (like managing buffers, reconnections, waiting for host to appear, etc.). You can read up on all the different kinds of sockets on the zguide page. Push-Pull sockets are the ones that we will be using. A push socket, as the name suggests, is a socket that pushes out messages and a pull socket is receives messages from a push socket. The messages pushed out are distributed by round robin to the connected pull sockets.

The architecture that we will be using is:

  • We define all tasks we will need to perform in a file named "tasks.py".
  • A master zeromq process binds on two ports, one to pull and one to push.
  • The application (or applications if you are running multiple instances) push tasks to the master zeromq process.
  • The master process pushes out the tasks it receives to the connected slave workers who execute the task.

Enough talk, lets look at some code!

The Code

###ZMQ Master This is the code for the master zmq process.

import zmq
import logging

context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.bind('tcp://127.0.0.1:5000')
push_socket = context.socket(zmq.PUSH)
push_socket.bind('tcp://127.0.0.1:5001')

while True:
    try:
        task_data = pull_socket.recv_json()
        push_socket.send_json(task_data)
    except Exception, e:
        error_logger.error(e)

pull_socket.close()
push_socket.close()
context.term()

Thats all! Now it is up and ready to do its task distribution magic!

###views.py Here is what your application needs to do to send tasks to the master. You specify the name of the task you want to call, and the arguments that need to be passed to it.

import zmq
import json

ctx = zmq.Context()
task_socket = ctx.socket(zmq.PUSH)
task_socket.connect('tcp://127.0.0.1:5000')

def some_view(request, *args, **kwargs):
    #view stuff
    #Kwargs that need to be passed to the task function
    task_kwargs = {
        'var1': var1,
        'var2': var2,
    }
    task_socket.send_json({
        'task': 'some_task', #name of the task function that needs to be called with the given arguments
        'task_kwargs': task_kwargs,
    })
    return HttpResponse('The task you requested will be completed shortly.')

###ZMQ Slave The slave connects to the master and imports the tasks it needs to perform. On receiving a message, it calls the relevant task function, passes the relevant arguments and executes the task.

import zmq
import tasks

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect('tcp://127.0.0.1:5001')

while True:
    try:
        task_data = socket.recv_json()
        task = task_data.pop('task')
        task_kwargs = task_data.pop('task_kwargs')
        getattr(tasks, task)(**task_kwargs)
        print "task done"
    except Exception, e:
        error_logger.error(e)

socket.close()
context.term()

###tasks.py We define the tasks that the slaves need to work on here.

import settings
from django.core.management import setup_environ
setup_environ(settings)
from app.models import *

def some_task(*args, **kwargs):
    #do some work here

def some_other_task(*args, **kwargs):
    #do some other work here

###Supervisor Config Supervisor is an excellent process control system that ensures our master and slave stay up and running at all times. We just add a few lines to the config file with whatever options we need. Notice that in the "zmqslave" config, we are starting 5 slave processes. This number can be set as high as you like. A good approximation is about 2*(Number of CPUs) + 1.

[program:zmqmaster]
command=/usr/bin/python /path/to/app/dir/scripts/zmqmaster.py
directory=/path/to/app/dir/
user=root
numprocs=1
autostart=true
autorestart=true
redirect_stderr=True

[program:zmqslave]
command=/usr/bin/python /path/to/app/dir/scripts/zmqslave.py
directory=/path/to/app/dir/
user=root
process_name= %(program_name)s%(process_num)s
numprocs=5
autostart=true
autorestart=true
redirect_stderr=True

And thats all really! We are ready to go asynchronous!

Conclusion

ZeroMQ is an excellent transport layer and can scale easily to multiple machines. It also has powerful abstractions to deal with highly scalable, distributed and complex architectures. This makes it an excellent choice to make a task distribution system like this. We have barely touched on the kind of architecture that would be needed to make the system robust. We need a persistence layer to save the tasks till they are done and also we need to mark processes when they are done (or remain undone), a mechanism to kill processes that take too long and pass their task to another process, a way to schedule tasks to run at a specific time and to try the tasks the fail, again. There is a lot of scope for improvement here. This is still a work in progress but we are hoping to come out with a python module which will deliver all these improvements and more. Once we have taken certain design decisions, the code will be up on github for the community to improve on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment