Skip to content

Instantly share code, notes, and snippets.

@traut
Created April 22, 2015 16:52
Show Gist options
  • Save traut/679c72470b16de9e9b3f to your computer and use it in GitHub Desktop.
Save traut/679c72470b16de9e9b3f to your computer and use it in GitHub Desktop.
Code needed to reproduce Celery Exchange issue
from __future__ import print_function
import pprint
import time
from celery import group, chain, chord
import simple_tasks as t
tasks = [
t.taskA.s(),
t.taskB.s(),
t.taskC.s(),
t.taskD.s(),
]
results = []
for t in tasks:
tid = t.delay()
results.append(tid)
def print_status(result):
print("{} ".format(result.__class__.__name__), end='')
if hasattr(result, 'state'):
print("{} {} ({})".format(result.task_name, result.id, result.state))
else:
print("{}".format(result.id))
while True:
for r in results:
print_status(r)
time.sleep(5)
from kombu import Queue, Exchange
custom_exchange = Exchange('custom_exchange', type='topic')
class TaskRouter(object):
def route_for_task(self, task, *args, **kwargs):
print "Routing %s" % task
return {
'exchange': custom_exchange,
'routing_key': str(task)
}
ROUTING_CONFIG = {
'CELERY_QUEUES': (
Queue('simple', exchange=custom_exchange, routing_key='simple.#'),
Queue('complex', exchange=custom_exchange, routing_key='complex.#'),
),
'CELERY_ROUTES': (TaskRouter(),),
'CELERY_DEFAULT_EXCHANGE': 'custom_exchange'
}
from __future__ import print_function
import time
from celery import Celery
redis_uri = "redis://%(host)s:%(port)d/%(db)d" % dict(
host = 'localhost',
port = 6379,
db = 0
)
app = Celery(__name__)
from routing import ROUTING_CONFIG
config = dict(
BROKER_URL = redis_uri,
CELERY_RESULT_BACKEND = redis_uri,
CELERY_TASK_SERIALIZER = 'pickle',
CELERY_RESULT_SERIALIZER = 'pickle',
CELERY_ACCEPT_CONTENT = ['pickle'],
CELERYD_LOG_FORMAT = "%(message)s",
CELERY_TRACK_STARTED = True
)
config.update(ROUTING_CONFIG)
app.conf.update(config)
def run_task(c):
print("task %s start" % c)
time.sleep(5)
print("task %s finish" % c)
return "result task %s" % c
@app.task(name='simple.task.a')
def taskA(*args, **kwargs):
return run_task('a')
@app.task(name='simple.task.b')
def taskB(*args, **kwargs):
return run_task('b')
@app.task(name='simple.task.c')
def taskC(*args, **kwargs):
return run_task('c')
@app.task(name='complex.task.d')
def taskD(*args, **kwargs):
return run_task('d')
if __name__ == '__main__':
app.worker_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment