Skip to content

Instantly share code, notes, and snippets.

@alexpearce
Created August 4, 2022 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexpearce/fb7b91831363102b5887fde25167165f to your computer and use it in GitHub Desktop.
Save alexpearce/fb7b91831363102b5887fde25167165f to your computer and use it in GitHub Desktop.
Real-time consumer of Celery's event queue.
import pika
#: Name of the Celery events exchange
EXCHANGE_NAME = "celeryev"
#: Arbitrary name for the transient queue this script will create and consume from
QUEUE_NAME = "pikatest"
#: Global to hold our channel object in
channel = None
# Step #2
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
# Open a channel
print("Connected")
connection.channel(on_open_callback=on_channel_open)
# Step #3
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
print("Declaring exchange")
channel.exchange_declare(
exchange=EXCHANGE_NAME,
exchange_type=pika.exchange_type.ExchangeType.topic,
passive=True,
callback=on_exchange_declared,
)
# Step #4
def on_exchange_declared(frame: pika.frame.Method):
print("Declaring queue")
channel.queue_declare(
queue=QUEUE_NAME,
durable=False,
exclusive=True,
auto_delete=True,
callback=on_queue_declared,
)
# Step #5
def on_queue_declared(frame: pika.frame.Method):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
print("Binding queue")
channel.queue_bind(
QUEUE_NAME, EXCHANGE_NAME, routing_key="#", callback=on_queue_bound
)
# Step #6
def on_queue_bound(frame: pika.frame.Method):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
print("Consuming from queue")
channel.basic_consume(QUEUE_NAME, on_message_callback=handle_delivery)
# Step #7
def handle_delivery(
channel: pika.channel.Channel,
method: pika.spec.Basic.Deliver,
header: pika.spec.BasicProperties,
body: bytes,
):
"""Called when we receive a message from RabbitMQ"""
# Choose to ignore worker heartbeat events hear to reduce log volume
if b"worker-heartbeat" not in body:
print("Handling delivery:", body)
channel.basic_ack(method.delivery_tag)
# Step #1: Connect to RabbitMQ using the default parameters
credentials = pika.PlainCredentials("rabbitmquser", "rabbitmqpassword")
parameters = pika.ConnectionParameters(
host="hostname", port=5672, credentials=credentials
)
connection = pika.SelectConnection(parameters, on_open_callback=on_connected)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment