Skip to content

Instantly share code, notes, and snippets.

@renatoapcosta
Last active June 29, 2022 21:47
Show Gist options
  • Save renatoapcosta/2a4b6c7a5933edf09e9226e11f1ca989 to your computer and use it in GitHub Desktop.
Save renatoapcosta/2a4b6c7a5933edf09e9226e11f1ca989 to your computer and use it in GitHub Desktop.
RabbitMQ com Python

RabbitMQ com Python

Instalando o python

Baixamos a versão 3.6.8 do python, pois a versão 3.7 está dando problema.

Depois devemos instalar o pip

python get-pip.py

python -m pip install -U pika

Agora temos o ambiente preparado.

Enviando e recebendo message

Fila

Enviando uma message

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

messages = ["Primeira mensagem", "Segunda mensagem", "Terceira mensagem", "Quarta mensagem", "Quinta mensagem"];

for message in messages:
	channel.basic_publish(exchange='',
						  routing_key='hello',
						  body=message)
	print(" [x] Enviada '" + message + "'")
	time.sleep(1)

connection.close()

Ele busca essa fila caso ela exista ou cria uma nova

channel.queue_declare(queue='hello')

Recebendo messages

#!/usr/bin/env python
import pika

def callback(ch, method, properties, body):
	print(" [x] Received %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)
					  
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Manual ACK

Configuramos auto_ack=True para assim que a message é recebida o rabbitmq é avisado.

Caso esse flag estiver como False, seriá necessario fazer esse aviso de forma manual no rabbit.

O rabbitmq já por padrão utiliza Round-robin, ou seja é só subir outro serviço de recepção.

Fila

Vamos ver um exemplo com auto_ack=False , mas o programa só vai avisar o rabbitmq quando oprocessamento terminar.

#!/usr/bin/env python
import pika
import time

def callback(ch, method, properties, body):
	print(" [x] Received %r" % body)
	time.sleep(1)
	print(" [x] Done")
	ch.basic_ack(delivery_tag = method.delivery_tag)
	

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',
                      auto_ack=False,
                      on_message_callback=callback)
					  
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

No fim do callback ele avisa o rabbitmq que a message foi recebida

ch.basic_ack(delivery_tag = method.delivery_tag)

Exchange

Não podemos se conectar na fila diretamente. Precisamos se conectar em um exchange e dentro dele ele direciona para a fila routing_key

exchange='',routing_key='hello'

Exchange FANOUT

Quando queremos que todos os consumidores recebam as mensagens.

O trabalho do exchange FANOUT é rotear as ensagens para todos consumidores.

Exchange FANOUT

send

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

messages = ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];

for message in messages:
	channel.basic_publish(exchange='logs',
						  routing_key='',
						  body=message)
	print(" [x] Enviada '" + message + "'")
	time.sleep(1)

connection.close()

consumer

#!/usr/bin/env python
import pika

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Exchange - DIRECT

Exchange - DIRECT

Com esse tipo de exchange conseguimos criar regras de roteamento.

As mensagens chega nesse exchage, ele verifica e descide para quem mandar.

sender

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

messages =   ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];
severities = ["info"	    , "error"	   , "warning"	   , "error"	 , "info"	   ];

for i in range(0, 5):
	channel.basic_publish(exchange='direct_logs', routing_key=severities[i], body=messages[i])
	print(" [x] Sent %r:%r" % (severities[i], messages[i]))
	time.sleep(1)

connection.close()

consumer

#!/usr/bin/env python
import pika
import sys

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Exchange - TOPIC

Exchange - TOPIC

Ele é mais maliavel que o exange DIRECT, pois aceita wildcart

sender

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

messages =   ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];
severities = ["info"	    , "error"	   , "warning"	   , "error"	 , "info"	   ];
components = ["A"			, "B"		   , "A"		   , "A"	     , "B"	   	   ];

for i in range(0, 5):
	routing_key = components[i] + "." + severities[i]

	channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=messages[i])
	print(" [x] Sent %r:%r" % (routing_key, messages[i]))
	time.sleep(1)

connection.close()

receiver

#!/usr/bin/env python
import pika
import sys

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment