Skip to content

Instantly share code, notes, and snippets.

@VMois
Created February 9, 2022 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VMois/362f42f933b7f46f40a80966d9585675 to your computer and use it in GitHub Desktop.
Save VMois/362f42f933b7f46f40a80966d9585675 to your computer and use it in GitHub Desktop.
RabbitMQ Python Kombu message delay example

RabbitMQ Python Kombu message retry

  1. Build RabbitMQ:
$ docker build -t rabbitmq-delay .
  1. Start RabbitMQ:
$ docker run -d -p 5672:5672 -p 15672:15672 --hostname my-rabbit --name some-rabbit rabbitmq-delay
  1. Install kombu:
$ pip3 install kombu
  1. Run consumer:
$ python3 consumer.py
  1. Publish message:
$ python3 producer.py
from kombu import Connection, Exchange, Queue, Producer
from kombu.mixins import ConsumerMixin
rabbit_url = "amqp://localhost:5672/"
MAX_RETRIES = 3
RETRY_DELAY=3000 # in ms
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
# create a delayed exchange
channel = self.connection.channel()
self.retry_exchange = Exchange("workflow-submission.retry", type="x-delayed-message", arguments={"x-delayed-type": "direct"})
queue = Queue("workflow-submission", exchange=self.retry_exchange, routing_key="workflow-submission")
queue(channel).declare()
self.producer = Producer(exchange=self.retry_exchange, channel=channel, routing_key="workflow-submission")
self.publish = self.connection.ensure(
self.producer,
self.producer.publish,
max_retries=3,
)
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues,
callbacks=[self.on_message],
prefetch_count=1)]
def on_message(self, body, message):
message.ack()
retries_count = message.headers.get("x-retries", 0)
print(f'got message: {body}, retries_count: {retries_count}')
if retries_count < MAX_RETRIES:
print(f"sent retry for {body}")
self.publish(body,
headers={
"x-retries": retries_count + 1,
"x-delay": RETRY_DELAY,
})
else:
print(f"No more retries left for {body}")
queues = [Queue("workflow-submission")]
with Connection(rabbit_url, heartbeat=4) as conn:
try:
worker = Worker(conn, queues)
worker.run()
except:
raise
FROM rabbitmq:3.9-management
RUN apt-get -o Acquire::Check-Date=false update && apt-get install -y curl
RUN curl -L https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez
RUN chown rabbitmq:rabbitmq $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RUN rabbitmq-plugins enable rabbitmq_management
import queue
from kombu import Connection, Exchange, Queue, Producer
rabbit_url = "amqp://localhost:5672/"
conn = Connection(rabbit_url)
channel = conn.channel()
exchange = Exchange("workflow-submission", type="direct")
queue = Queue("workflow-submission", type="direct", routing_key="workflow-submission", exchange=exchange)
queue(channel).declare()
producer = Producer(exchange=exchange, channel=channel)
producer.publish("Workflow 1", declare=[exchange], routing_key="workflow-submission")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment