Baixamos a versão 3.6.8 do python, pois a versão 3.7 está dando problema.
Depois devemos instalar o pip
python get-pip.py
python -m pip install -U pika
Agora temos o ambiente preparado.
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
messages = ["Primeira mensagem", "Segunda mensagem", "Terceira mensagem", "Quarta mensagem", "Quinta mensagem"];
for message in messages:
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Enviada '" + message + "'")
time.sleep(1)
connection.close()
Ele busca essa fila caso ela exista ou cria uma nova
channel.queue_declare(queue='hello')
#!/usr/bin/env python
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Configuramos auto_ack=True
para assim que a message é recebida o rabbitmq é avisado.
Caso esse flag estiver como False, seriá necessario fazer esse aviso de forma manual no rabbit.
O rabbitmq já por padrão utiliza Round-robin, ou seja é só subir outro serviço de recepção.
Vamos ver um exemplo com auto_ack=False
, mas o programa só vai avisar o rabbitmq quando oprocessamento terminar.
#!/usr/bin/env python
import pika
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(1)
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello',
auto_ack=False,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
No fim do callback ele avisa o rabbitmq que a message foi recebida
ch.basic_ack(delivery_tag = method.delivery_tag)
Não podemos se conectar na fila diretamente. Precisamos se conectar em um exchange e dentro dele ele direciona para a fila routing_key
exchange='',routing_key='hello'
Quando queremos que todos os consumidores recebam as mensagens.
O trabalho do exchange FANOUT é rotear as ensagens para todos consumidores.
send
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
messages = ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];
for message in messages:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Enviada '" + message + "'")
time.sleep(1)
connection.close()
consumer
#!/usr/bin/env python
import pika
def callback(ch, method, properties, body):
print(" [x] %r" % body)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Com esse tipo de exchange conseguimos criar regras de roteamento.
As mensagens chega nesse exchage, ele verifica e descide para quem mandar.
sender
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
messages = ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];
severities = ["info" , "error" , "warning" , "error" , "info" ];
for i in range(0, 5):
channel.basic_publish(exchange='direct_logs', routing_key=severities[i], body=messages[i])
print(" [x] Sent %r:%r" % (severities[i], messages[i]))
time.sleep(1)
connection.close()
consumer
#!/usr/bin/env python
import pika
import sys
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Ele é mais maliavel que o exange DIRECT, pois aceita wildcart
sender
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
messages = ["Primeiro log", "Segundo log", "Terceiro log", "Quarto log", "Quinto log"];
severities = ["info" , "error" , "warning" , "error" , "info" ];
components = ["A" , "B" , "A" , "A" , "B" ];
for i in range(0, 5):
routing_key = components[i] + "." + severities[i]
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=messages[i])
print(" [x] Sent %r:%r" % (routing_key, messages[i]))
time.sleep(1)
connection.close()
receiver
#!/usr/bin/env python
import pika
import sys
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()