Skip to content

Instantly share code, notes, and snippets.

@javierarilos
Last active February 9, 2021 18:23
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save javierarilos/9348168 to your computer and use it in GitHub Desktop.
Save javierarilos/9348168 to your computer and use it in GitHub Desktop.
RabbitMq messaging concepts using Python + pika
from pika import BlockingConnection, ConnectionParameters, BasicProperties
##########################################################################
# RABBITMQ & AMQP INTRO - MESSAGING PATTERNS
# http://www.slideshare.net/javierarilos/rabbitmq-intromsgingpatterns
#
# ******** Escenario 1:
# *
# Using pika we are going to create a exchange named 'important', bind it to a queue named 'important-jobs'.
# Finally we will produce (to exchange) and consume (from queue) a message.
# escenario 1 - step 1 - connect & channel setup
conn = BlockingConnection(ConnectionParameters('localhost'))
ch = conn.channel()
# escenario 1 - step 2 - declare exchange
ch.exchange_declare(exchange='important', type='direct')
# escenario 1 - step 3 - declare the queue
ch.queue_declare(queue='important-jobs')
# escenario 1 - step 4 - bind queue and exchange
ch.queue_bind(exchange='important', queue='important-jobs', routing_key='important')
# escenario 1 - step 5 - produce the message
ch.basic_publish(exchange='important', routing_key='important', body='new important task')
# escenario 1 - step 6 - consume the message
method_frame, header_frame, body = ch.basic_get('important-jobs')
print "msg received from queue 'important-jobs' : ", body
# escenario 1 - step 6 - acknowledge the message
ch.basic_ack(method_frame.delivery_tag)
# ******** Escenario 2:
# *
# All important messages have to be sent also to a new queue named traces.
# We will produce an important message and consume it from both queues: important-jobs and traces
# escenario 2 - step 1 - create and bind new traces queue, send an important message
ch.queue_declare(queue='traces')
ch.queue_bind(exchange='important', queue='traces', routing_key='important')
ch.basic_publish(exchange='important', routing_key='important', body='[another task to be handled, important]')
# escenario 2 - step 2 - consume message from both queues
method_frame, header_frame, important_job = ch.basic_get('important-jobs')
print "msg received from queue 'important-jobs' : ", important_job
ch.basic_ack(method_frame.delivery_tag)
method_frame, header_frame, trace = ch.basic_get('traces')
print "msg received from queue 'traces' : ", trace
ch.basic_ack(method_frame.delivery_tag)
# ******** Escenario 3:
# *
# customer messages to important exchange must be routed to
# different queues depending on the operation to perform (signup, update) and also to traces
# escenario 3 - step 1 - bind 'traces' to exchange 'important' and 'customer' routing key, create customer queues
ch.queue_bind(exchange='important', queue='traces', routing_key='customer')
ch.queue_declare(queue='signup')
ch.queue_declare(queue='update')
# escenario 3 - step 2 - create new exchange 'customer' of type headers, bind it to 'important' on routing key 'customer'
ch.exchange_declare(exchange='customer', type='headers')
ch.exchange_bind(source='important', destination='customer', routing_key='customer')
ch.queue_bind(exchange='customer', queue='signup', routing_key='', arguments={'operation': 'signup', 'x-match':'any'})
ch.queue_bind(exchange='customer', queue='update', routing_key='', arguments={'operation': 'update', 'x-match':'any'})
# escenario 3 - step 3 - sending a customer signup message (with headers), consume it (from 'signup' and 'traces')
ch.basic_publish(exchange='important', routing_key='customer', body='this is our new customer num=25',
properties=BasicProperties(headers={'operation': 'signup'}))
method_frame, header_frame, msg = ch.basic_get('signup')
print "msg received from queue 'signup' : ", msg
ch.basic_ack(method_frame.delivery_tag)
method_frame, header_frame, trace = ch.basic_get('traces')
print "msg received from queue 'traces' : ", trace
ch.basic_ack(method_frame.delivery_tag)
# ******** Escenario 4:
# *
# RabbitMQ deadleter exchanges.
# Messages in important-jobs queue that cannot be handled will be rejected by client and sent to rejected-jobs exchange.
# escenario 4 - step 1 - create rejected-jobs exchange and queue
ch.exchange_declare(exchange='rejected-jobs', type='direct')
ch.queue_declare(queue='rejected-jobs')
ch.queue_bind(exchange='rejected-jobs', queue='rejected-jobs', routing_key='important')
# escenario 4 - step 2 - redeclare important-jobs queue to deadletter messages, redeclare bindings
ch.queue_delete('important-jobs')
ch.queue_declare(queue='important-jobs', arguments={'x-dead-letter-exchange': 'rejected-jobs'})
ch.queue_bind(exchange='important', queue='important-jobs', routing_key='important')
# escenario 4 - step 3 - publish to important-jobs, consumer rejects the message
ch.basic_publish(exchange='important', routing_key='important', body='[unparseable message]')
method_frame, header_frame, important_job = ch.basic_get('important-jobs')
print "UNPARSEABLE msg received from queue 'important-jobs' : ", important_job, " >> rejecting msg"
ch.basic_reject(method_frame.delivery_tag, requeue=False)
# escenario 4 - step 4 - message was routed to rejected-jobs
method_frame, header_frame, rejected_job = ch.basic_get('rejected-jobs')
print "i know what to do with unparseable messages: received in 'rejected-jobs' : ", rejected_job
ch.basic_ack(method_frame.delivery_tag)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment