Skip to content

Instantly share code, notes, and snippets.

@AD0791
Last active February 21, 2022 16:21
Show Gist options
  • Save AD0791/323ea3a563a8f0dcc62e2b98038eddfa to your computer and use it in GitHub Desktop.
Save AD0791/323ea3a563a8f0dcc62e2b98038eddfa to your computer and use it in GitHub Desktop.
RabbitMQ service. The publisher doesn't need to declare exchange, queue and do the binding
from models.item import Item
from json import dumps
from pika import (
BlockingConnection,
ConnectionParameters,
BasicProperties
)
async def register_item_service(item:dict)->Item:
_itemr = dict(item)
message = dumps(_itemr)
connection = BlockingConnection( ConnectionParameters(host='localhost'))
channel = connection.channel()
exchange = channel.exchange_declare(exchange="inventory-exchange", exchange_type='fanout',durable=True)
queue = channel.queue_declare(queue='register-item-queue', durable=True)
qu = queue.method.queue
channel.queue_bind(exchange="inventory-exchange", queue=qu,routing_key="register-key")
channel.basic_publish(
exchange="inventory-exchange",
routing_key="register_key",
body= message,
properties= BasicProperties(
content_type="application/json",
delivery_mode= 2
)
)
connection.close()
if _itemr:
return Item(**_itemr)
from models.item import Item
from json import dumps
from core.settings import setting
from pika import (
BlockingConnection,
ConnectionParameters,
BasicProperties
)
async def register_item_service(item:dict)->Item:
_itemr = dict(item)
message = dumps(_itemr)
connection = BlockingConnection(
ConnectionParameters(host=setting.rabbitmq_host)
)
channel = connection.channel()
channel.basic_publish(
exchange=setting.exchange,
routing_key=setting.routing_key,
body= message,
properties= BasicProperties(
content_type="application/json",
delivery_mode= 2
)
)
connection.close()
if _itemr:
return Item(**_itemr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment