Skip to content

Instantly share code, notes, and snippets.

@rduplain
Forked from ask/gist:2014539
Created March 11, 2012 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rduplain/2017517 to your computer and use it in GitHub Desktop.
Save rduplain/2017517 to your computer and use it in GitHub Desktop.
celery consumer service using boot-steps.
from celery import abstract
from celery import current_app
from kombu import Exchange, Queue
from kombu.mixins import ConsumerMixin
# need to subclass the result backend so that it uses a topic exchange
# instead of direct, and send the results for tasks using a routing_key
# of the format:
# results.%(task_name)s.%(task_id)s
class BlaConsumer(ConsumerMixin):
exchange = Exchange("celeryresults", type="topic")
def __init__(self):
self.connection = current_app.broker_connection()
self.total = 0
def get_consumers(self, Consumer, channel):
return [Consumer(queues=Queue("results_fan",
exchange=self.exchange,
routing_key="results.#"),
callbacks=[self.handle_result])]
def handle_result(self, result):
# could collect messages here and flush
# every second, or for every 100 message received.
#if not self.total % 100 or time() > flush_every:
task_id = result["task_id"]
access_token = result["result"]["access_token"]
if should_send_notification:
notify_user(access_token.user)
class BlaComponent(abstract.StartStopComponent):
"""CELERY_BOOT_STEPS = ("name_of_this_module")"""
name = "worker.bla"
requires = ()
def create(self, worker):
# celeryd will now call obj.start when it starts, and obj.stop when it
# shuts down.
obj = BlaConsumer()
return obj
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment