Skip to content

Instantly share code, notes, and snippets.

@spumer
Created December 4, 2018 09:35
Show Gist options
  • Save spumer/6a031205dcdf4b9f0e560f54f49b27af to your computer and use it in GitHub Desktop.
Save spumer/6a031205dcdf4b9f0e560f54f49b27af to your computer and use it in GitHub Desktop.
Django celery dlx
class Celery(celery.Celery):
def on_config_ready(self):
"""This method should be called manually after all configuration prepares
"""
assert DeclareDLXnDLQ not in self.steps['worker']
conf = self.conf
if not conf.get('task_default_queue_declare_dlx', False):
return
default_queue_name = conf.task_default_queue
# Declare default queues
# We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support
exchange = Exchange(
name=conf.task_default_exchange,
type=conf.task_default_exchange_type,
durable=conf.broker_durable,
auto_delete=conf.broker_auto_delete,
)
queue = Queue(
default_queue_name,
exchange,
routing_key=conf.task_default_routing_key,
queue_arguments={
'x-dead-letter-exchange': conf.broker_dlx_exchange,
'x-dead-letter-routing-key': default_queue_name,
},
)
# If you injected the DLQ in the task_default_queues,
# the Celery was consuming the queue and it was always empty.
# So a manual way of declaring DL(X / Q) was needed.
# Inject the default queue in celery application
conf.task_queues = (queue, )
# Inject extra bootstep that declares DLX and DLQ
self.steps['worker'].add(DeclareDLXnDLQ)
# https://stackoverflow.com/a/46607586
class DeclareDLXnDLQ(bootsteps.StartStopStep):
"""
Celery Bootstep to declare the DL exchange and queues before the worker starts
processing tasks
"""
requires = {'celery.worker.components:Pool'}
logger = logging.getLogger('Celery DLX Step')
def start(self, worker):
conf = worker.app.conf
default_queue_name = conf.task_default_queue
self.logger.info('Prepare DLX')
# Declare DLX and DLQ
dlx_exchange = Exchange(
name=conf.broker_dlx_exchange,
durable=conf.broker_durable,
auto_delete=conf.broker_auto_delete,
)
dlx_celery_queue = Queue(
name=f'{conf.broker_dlx_prefix}.{conf.task_default_queue}',
durable=conf.broker_durable,
auto_delete=conf.broker_auto_delete,
bindings=[
binding(exchange=dlx_exchange, routing_key=default_queue_name),
]
)
self.logger.info('Declare DLX/DLQ')
with worker.app.pool.acquire() as conn:
dlx_celery_queue.bind(conn).declare()
# Важно! Без этой опции не будет работать помещение упавших задач в DLX
# True - ACK только после выполнения задачи, False - сразу при получении
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#reject
# http://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late
CELERY_TASK_ACKS_LATE = True
# True - объявлять DLX для основной очереди, False - оставить как есть
CELERY_TASK_DEFAULT_QUEUE_DECLARE_DLX = True
CELERY_BROKER_DLX_EXCHANGE = 'DLX'
CELERY_BROKER_DLX_PREFIX = 'dlx'
CELERY_BROKER_DURABLE = True
CELERY_BROKER_AUTO_DELETE = False
@spumer
Copy link
Author

spumer commented Dec 4, 2018

Поидее всё это можно решить политикой в rabbitmq:

После выполнения которой достаточно создать зеркальную очередь с префиксом dlx, н-р dlx.celery

$ cat declare-dlx-policy.sh
#!/bin/bash


BIN="./rabbitmqadmin"
DLX_NAME="DLX"
DLX_PREFIX="dlx."

HOST=${HOST:-localhost}
PORT=${PORT:-15672}
OPTS=${OPTS:-}
OPTS="-H ${HOST} -P ${PORT} ${OPTS}"

function create_named_dlx () {
    $BIN ${OPTS} declare queue name="${DLX_PREFIX}${1}" durable=true
    $BIN ${OPTS} declare binding source="${DLX_NAME}" destination="${DLX_PREFIX}${1}" routing_key="${1}"
}

function create_all () {
    $BIN ${OPTS} declare exchange name="${DLX_NAME}" type=direct durable=true
    $BIN ${OPTS} declare policy name="${DLX_NAME}" pattern='.*' definition="{\"dead-letter-exchange\":\"${DLX_NAME}\"}" apply-to=queues
    create_named_dlx celery
}


function main () {
    hash $BIN || (echo "Get rabbitmqadmin exceutable from http://rabbitmq-host:15672/cli/rabbitmqadmin"; exit 1)
    create_all
}
main

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment