Skip to content

Instantly share code, notes, and snippets.

@nenodias
Created April 2, 2023 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nenodias/36728ca2a1066d832cc1db298db0b1f4 to your computer and use it in GitHub Desktop.
Save nenodias/36728ca2a1066d832cc1db298db0b1f4 to your computer and use it in GitHub Desktop.
Python Pika - AMQP Producer/Consumer

Example using Python + Pika for producing and receiving messages from AMQP

import pika
from pika.exchange_type import ExchangeType
queue_name = 'queue_estoque_novo_pedido'
dlq_name = 'dlq_estoque_novo_pedido'
username = ''
password = ''
host = ''
vhost = ''
def get_channel():
auth = pika.PlainCredentials(username=username, password=password)
params = pika.ConnectionParameters(credentials=auth, host=host, virtual_host=vhost)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.exchange_declare(exchange=dlq_name, exchange_type=ExchangeType.topic)
channel.queue_declare(queue=queue_name, durable=True, arguments={
"x-dead-letter-exchange": dlq_name,
"x-dead-letter-routing-key": dlq_name
})
channel.queue_declare(queue=dlq_name, durable=True)
channel.queue_bind(dlq_name, dlq_name, dlq_name)
return (channel, connection)
import pika, json
from typing import List
from pika.channel import Channel
from pika import BasicProperties
from pika.spec import Basic
import config
class Item(object):
def __init__(self, description: str, price: float, quantity: float):
self.description = description
self.price = price
self.quantity = quantity
ListItem = List[Item]
class Order(object):
def __init__(self, total : float, items: ListItem):
self.total = total
self.items = items
def consumer(ch: Channel, method: Basic.Deliver, properties: BasicProperties, body):
print("Consumer")
print(f'body={body}')
print(f'method={method}')
print(f'properties={properties}')
try:
dados = json.loads(body)
order = Order(total=dados["total"], items=[Item(description=i["description"], price=i["price"], quantity=i["quantity"]) for i in dados["items"]])
print(f"{order}")
print(f" [x] Message successfully received!")
except Exception as ex:
props = pika.BasicProperties(
delivery_mode = 2,
headers={"exception": f"{ex}"}
)
ch.basic_publish(config.dlq_name, config.dlq_name, body=body, properties=props)
finally:
ch.basic_ack(delivery_tag = method.delivery_tag)
connection = None
try:
channel, connection = config.get_channel()
ret = channel.basic_consume(queue=config.queue_name, auto_ack=False, on_message_callback=consumer)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except Exception as ex:
print(f" [x] Excepton: {type(ex)}:{ex}")
finally:
connection.close()
import pika
import config
dados = """{
"total": 462,
"items":[
{"description":"Pelo", "price":0.50, "quantity":60},
{"description":"Chumbinho", "price":4.80, "quantity":90}
]
}"""
channel, connection = config.get_channel()
channel.basic_publish(exchange='', routing_key=config.queue_name, body=dados)
print(f" [x] Sent {dados}")
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment