Skip to content

Instantly share code, notes, and snippets.

@fredcamps
Last active April 11, 2018 20:35
Show Gist options
  • Save fredcamps/2053406275b7526dda19bd591886db72 to your computer and use it in GitHub Desktop.
Save fredcamps/2053406275b7526dda19bd591886db72 to your computer and use it in GitHub Desktop.
celery dinamic queue bootstrap
from celery import Celery
from kombu import Queue, Exchange
PREFIX = 'baz'
EXCHANGE = 'exchange'
def get_task_names():
return (
'%s.foo.bar' % PREFIX,
'celery',
)
def get_queues():
queues = []
exchange = Exchange(name=EXCHANGE, type='direct')
for name in get_task_names():
queue = Queue(name=name,
exchange=exchange,
routing_key=name,)
queues.append(queue)
return queues
celery_app = Celery()
celery_app.conf.update(task_queues=get_queues())
celery_app.autodiscover_tasks(get_task_names())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment