Skip to content

Instantly share code, notes, and snippets.

@dblackdblack
Created September 9, 2013 17:27
Show Gist options
  • Save dblackdblack/6498812 to your computer and use it in GitHub Desktop.
Save dblackdblack/6498812 to your computer and use it in GitHub Desktop.
As I may have mentioned, I've been working on a way to integrate our existing MQWorker infrastructure to use celery. This would give us the suite of management/monitoring/remote control tools that comes with celery. And presumably all future python async workers would be written in celery. So here we go:
# Integrating Celery ↔ MQWorkers
## Celery uses [webhooks](http://docs.celeryproject.org/en/latest/userguide/remote-tasks.html) in order to call functions written in other languages (e.g. MQWorker)
1. Run a starman instance that takes the name of the MQWorker class as a path element in an HTTP GET URL e.g. http://127.0.0.1:8181/StampPDF will cause an instance of bepress::MQWorker::StampPDF to be instantiated.
1. (α/β-quality) code for a .psgi that does this is checked in [here](https://fisheye.bepress.com/browse/main/trunk/psgi/worker.psgi)
2. `start_server --port 8181 --signal-on-hup=QUIT --signal-on-term=QUIT -- /path/to/starman --workers 4 --preload-app $FILETREE/psgi/worker.psgi`
3. On worker success, .psgi file returns a 200 code and a JSON string like:
````{"status": "success", "retval": "I did ur work! context_key 12345 is done"}````
On failure, also return a 200 but with a JSON string like:
````{"status": "failure", "reason": "Invalid moon alignment."}````
2. To call into this worker from python, we need to define a decorated task function like this:
````
@celery.task
def mqtask(mq_class, msg):
url = celery.task.http.URL('http://127.0.0.1:8181/{0}'.format(mq_class))
res = url.post_async(msg=msg) # fire off an async POST request; res is a Future
return res
````
The [effect](http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes) of decorating the `mqtask()` function with `@celery.task` is ("roughly") to create a sub-class of celery.Task which has a `run()` method that is the decorated function. It is possible to directly sub-class celery.Task, but this is not the recommended way to create celery tasks and you bypass a lot of the magic that happens behind the scenes. If at all possible, stick with just decorating a single function and have celery create your sub-classes automatically for you.
3. To kick off a celery job that will add a celery job that will call this HTTP endpoint, which will perform an MQWorker task, run the following python command
````
mqtask.delay(mq_class='StampPDF', msg='{"context_key":12345}')
````
4. Assuming you've changed the celery task serializer is set to JSON (it defaults to python pickle), calling `mqtask.delay()` as above will result in a message like this getting added to RabbitMQ:
````
{
"args": [ ],
"callbacks": null,
"chord": null,
"errbacks": null,
"eta": null,
"expires": null,
"id": "e0e34589-37fa-436c-ae65-251a3f36595f",
"kwargs": {
"mq_class": "StampPDF",
"msg": "{\"context_key\":12345}",
},
"retries": 0,
"task": "celery_test.tasks.mqtask",
"taskset":null
"utc":true
}
````
5. Assuming you pip installed `django-celery`, which integrates celery with django's settings, path, etc. functionality, and you have added `djcelery` to your django `INSTALLED_APPS` list, you can use django's `manage.py` to kick off the celery worker threads. Using daemontools/runit/supervisor/upstart, run the celery workers in a separate process with a `run` command that looks like:
````
exec /path/to/virtualenv/bin/python /path/to/manage.py celery worker -n worker_name_string -Q celery,listen_queue_1,listen_queue_2 --loglevel=INFO --concurrency 4
````
6. The celery worker threads will pick up the RabbitMQ JSON message from above and will look up the task `celery_tasks.tasks.mqtask` in the celery registry (which means it is possible name the task something more specific for metrics/human consumption and also for setting rate limits), and will call the celery.Task object for that name from the registry. This object is then passed the kwargs from the JSON message to its `run()` method, which, as noted above, is just the `@celery.task` decorated function `mqtask()`. In other words, our `mqtask()` function gets called with these kwargs from the JSON blob: `{"mq_class":"StampPDF","msg":"{\"context_key\":12345}"}`
7. Our `mqtask()` will get called with `{"mq_class": "StampPDF", "context_key" "12345" }`. If we followed the implementation defined above, `mqtask()` will then call
`celery.task.http.URL('http://127.0.0.1:8181/StampPDF').post_async(msg='{"context_key":12345}')`
calling post_async()
1. Calling `post_async()` fires off another asynchronous celery job with a JSON blob that looks like:
````
{
"args": [
"http://127.0.0.1:8181/StampPDF",
"POST"
],
"callbacks": null,
"chord": null,
"errbacks": null,
"eta": null,
"expires": null,
"id": "3c85de1a-27f6-439d-a205-d858a257ca40",
"kwargs": {
"msg": "{\"context_key\":12345}"
},
"retries": 0,
"task": "celery.task.http.HttpDispatchTask",
"taskset": null,
"utc": true
}
````
2. In other words, our initial `mqtask()` task implementation doesn't actually do work; it simply emits another celery task that is responsible for hitting our Starman HTTP endpoint. That task (the `celery.task.http.HttpDispatchTask` task fired off when the celery workers process the initial `mqtask()` call asynchronously) is responsible for actually making the HTTP request and translating the HTTP response into a celery response.
3. This extra round-trip through RabbitMQ is obviously silly and it would be nice if there was a non `_async()` version of `get_async()` and `post_async()` in `celery.task.http.URL` -- but there isn't
4. By inspecting the source of `celery.task.http.URL.get/post_async()` it appears as if this one-liner would do the HTTP request and translation in-process:
`celery.task.http.URL("http://www.google.com").dispatcher.apply(args=['http://www.google.com','GET'])`
5. The behavior we want is for our `mqtask()` job to make the HTTP call in-process rather than making a second round-trip through RabbitMQ
6. The clearest way to get the behavior we want is to sub-class `celery.task.http.URL` and implement our own `get/post_async()` following the pattern described [here](http://docs.celeryproject.org/en/latest/userguide/calling.html#example) for translating between `delay()` calls and `apply_async()` calls. `delay()` is just a slimmed-down version of `apply_async()` (in fact `delay()` calls `apply_async()`) that does not accept as many options/parameters. The reason we want to write our own `get/post_async()` that uses `apply_async()` instead of `delay()` is that writing the `get/post_async()` methods allows us to simultaneously write a synchronous `get()/post()` implementations that call celery's `apply()` method, which has the same calling semantics as `apply_async()` but which does work in-process/synchronously. Thus, we will have a worker that doesn't need two trips through RabbitMQ. I will ask on the celery mailing list if this is really what we should be doing and whether they would accept a patch to mainline this functionality.
## Segregating/rate-limiting jobs
Remember that the celery workers aren't actually doing the work; they're just making HTTP calls to our Starman endpoints which are doing the work. So, from a celery point of view, we could happily fire up 500 simultaneous StampPDF requests, since those requests are just cheap HTTP hits and don't actually do work themselves. Of course they would block while the number of Starman connections is immediately overwhelmed by 500 incoming requests.
It therefore behooves us to separate out our jobs into separate pools and to establish rate limits on those pools so that a sudden solr indexing storm doesn't, for example, starve our ExpressoCopyArticle workers.
There are three possibilities for doing this, one which is fairly analagous to our current method of rate-limiting MQWorkers, and another which uses celery's built-in management features.
The third possibility -- have our perl code emit JSON in the format that the celery worker is expecting, is really just an extension of either the first or the second possibilities, but which short-circuits the need to ingest our current, short perl-generated worker JSON messages in order to translate them into celery-esque JSON.
I believe using the second (using celery's featureset) is better, but I will enumerate all three possibilies here.
- - -
### Possibility the first (using AMQP routing and separate queues)
1. When defining our celery job with the `@celery.task` decorator, we can also pass in parameters with the decorator. So, for example, we can pass in an AMQP routing key like so:
````
@celery.task(routing_key='StampPDF')
````
Then, if we set up a queue on RabbitMQ (assume we name the queue 'stamp-pdf') and configured RabbitMQ so that the `stamp-pdf` queue will receive any messages sent to the `celery` exchange which have a `StampPDF` routing key, then calls to the celery task which is decorated with `routing_key='StampPDF'` will result in jobs being routed to the `stamp-pdf` queue
2. Once we've set up the routing for message delivery on RabbitMQ, it is also necessary to adjust the celery worker pool so that it listens to the stamp-pdf queue in addition to whatever other queues it is already listening to. In the `run` script listed above, this can be accomplished by appending to the `-Q` list for the supervised celery worker pool. By having some group(s) of workers listen to our new stamp-pdf queue, we can segregate the worker pools and have a fixed `--concurrency` configuration so that only, say, four simultaneous workers may execute jobs from the `stamp-pdf` queue. These jobs are, again, HTTP requests which will block while the Starman/perl side of the process is actually doing the work of stamping a PDF.
- - -
### Possibility the second (using celery task names and celery's built-in per-name rate limiting)
Celery tasks always have a name. By default, this name is auto-generated based on the name of your task function. However, in our case, that would mean that all `mqtask` tasks would end up with the same name if we stuck with the default. Luckily, it is possible to specify the task name in the `@celery.task` decorator. So for example `@celery.task(name='StampPDF')` would create a celery task named StampPDF. The celery management tools like to use this name for displaying metrics on your tasks and, luckily for us, allow the administrator to set rate limits per-name.
So all we need to do is make sure that we have separate names for every MQWorker we are trying to manage with celery. It is worth pointing out that all names have to be determine-able at compile time, since that is when the celery task registry is populated. It is of course possible to modify the celery registry at run-time, but we can get away with naming all our tasks at compile time, so there's no need to fiddle with the celery registry at run time. It's best to let the celery auto-discovery do its thing if at all possible.
1. Because we will essentially be re-using the same `@celery.task`-decorated `mqtask()` function to define several celery tasks (each with a different internal celery name), we will want to write a simple closure-generating wrapper around our `mqtask()` definition:
````
def gen_mqtask(name):
@celery.task(name=name)
def mqtask(mq_class, msg, routing_key=None):
url = myURL('http://127.0.0.1:8181/{0}'.format(mq_class)) # myURL implements the post() method as defined above
# to perform an HTTP POST in-process
res = url.post(msg=msg)
return res
return mqtask
````
2. As noted above, all names must be enumerated at compile time, so we will need to generate all of the named `mqtask()` functions at compile time using something in the global scope of our `tasks.py` package which calls our `gen_mqtask()` with all possible names:
````
all_tasks = { }
names = ['StampPDF', 'SolrIndex', 'ExpressoCopyArticle'] # these could come from django settings
for n in names:
all_tasks[n] = gen_mqtask(n)
````
3. When we want to actually create a celery task, then, we will need to access it via this `all_tasks` dict. We'd almost certainly want to write an accessor for these celery tasks, but for purposes of simplicity in demonstration, we could access the dict directly and issue something like this in order to add a named celery job:
````
celery_test.tasks.all_tasks['StampPDF'].delay(mq_class='StampPDF', msg='{"context_key":12345}')
````
We could of course decide that for all mqtasks, the task's name and its `mq_class` function parameter must be the same and remove `mq_class` from the calling signature for `mqtask()` tasks (or give it the default value of `mq_class=name` inside `gen_mqtask()`) which would make the calling signature for a task simpler.
4. If we now want to rate-limit a particular task, we can do so via the `flower` web UI tool for managing celery or via the `celery events` command line tool. [Documentation](http://docs.celeryproject.org/en/latest/userguide/monitoring.html)
One thing to note here is that routing- and name-based segmentation is not mutually exclusive. You could, for instance, have separate high-, medium-, and low-priority queues with separate worker pools assigned to each. Typically you would use celery's auto-routing functionality whereby any message sent to the task with the name 'StampPDF-lowpriority' would automatically be routed to the low-priority queue.
Instagram segments their celery jobs differently. It is possible to run the celery workers using an asynchronous event loop instead of the standard pre-fork model that we're familiar with from our MQWorker work. Instagram found, however, that not all of their standard workers were a good match for event-based I/O, so they instead route their tasks that are heavily network bound (e.g. where they want to cross-post to Facebook, Tumblr, Twitter or upload to S3) into their async celery pool, while everything else goes to the regular event pool.
Disqus segments differently, with separate pools for jobs which they expect to be of "short", "medium," and "long" duration, since they've found that having long-running tasks will very quickly starve fast tasks if they live in the same queue. New celery tasks start out in either the "medium" or "long" duration queues, and only once they have proven that they are fast are they promoted to the short-duration queue.
- - -
### Possibility the third (from perl, emitting celery-esque JSON, in combination with the first or second possibilities above)
#### This is probably what we want to do, with the caveat that I haven't yet actually tried it
In looking at the description of the RabbitMQ communications that happen over the course of a celery task's lifetime, it should be pretty clear that we could fairly easily decide to emit, from our perl MQSender code, JSON that conforms to celery's JSON expectations. There's nothing in the celery JSON that looks particularly difficult to emulate (forge). We'd just need to decide whether we're going with routing-based or name-based segmentation of processes.
The only downside here is that I haven't actually tried it, although I don't see anything preventing this approach, with one possible exception: Celery actually tracks the lifetime of a task id in some sort of ephemeral cache (memcached?), and you can inspect the state of a given task in the CLI or in the Flower web UI. I'm not sure how those tools will react to a task that bypasses the normal celery workflow and magically (from the point of view of celery) ends up in RabbitMQ without first going through the "adding-to-RabbitMQ" state. It is possible that they will throw out the task if it skips steps in the celery life-cycle, although I should think that celery making this design choice would be unlikely, since it would mean that if whatever ephemeral cache of task ids got out of sync with what was in the authoritative list of tasks (RabbitMQ), then celery would start throwing out legitimate tasks.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment