Skip to content

Instantly share code, notes, and snippets.

@Sanix-Darker
Forked from gtindo/rabbitmq_manager.py
Created November 3, 2019 02:40
Show Gist options
  • Save Sanix-Darker/3b6ddc4c260bd74f0d25f0cb5b44c4d3 to your computer and use it in GitHub Desktop.
Save Sanix-Darker/3b6ddc4c260bd74f0d25f0cb5b44c4d3 to your computer and use it in GitHub Desktop.
Rabbitmq Client manager
import logging
import pika
import configparser
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class RabbitmqManager:
channel = None
def __init__(self):
"""
CONFIG FILE FORMAT :
Host = host_name
VirtualHost = virtual_host
Username = username
Password = password
Port = port
QueueNameSend = queue_send
QueueNameReceive = queue_receive
"""
config = configparser.ConfigParser()
config.read("config.txt")
config_rabbitmq = config["RABBITMQ"]
self._HOST_NAME = config_rabbitmq["Host"]
self._PORT = config_rabbitmq["Port"]
self._VIRTUAL_HOST = config_rabbitmq["VirtualHost"]
self._RABBITMQ_CREDENTIALS = pika.PlainCredentials(
username = config_rabbitmq["Username"],
password = config_rabbitmq["Password"]
)
self._QUEUE_NAME_SEND = config_rabbitmq["QueueNameSend"]
self._QUEUE_NAME_RECEIVE = config_rabbitmq["QueueNameReceive"]
def connect(self):
"""Connect to RabbitMQ using the init parameters"""
parameters = pika.ConnectionParameters(
host=self._HOST_NAME,
port=self._PORT,
virtual_host=self._VIRTUAL_HOST,
credentials=self._RABBITMQ_CREDENTIALS,
)
connection = pika.SelectConnection(parameters, on_open_callback=self.on_connected)
return connection
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate
"""
self._connection = self.connect()
try:
self._connection.ioloop.start()
except:
LOGGER.error("Connection Failed")
def on_connected(self, connection):
"""Called when we are fully connected to RabbitMQ"""
LOGGER.info("Connection establish successfully to RabbitMQ Server")
connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.queue_declare(
queue=self._QUEUE_NAME_RECEIVE,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared
)
def on_queue_declared(self, frame):
"""Called When RabbitMQ told us our Queue has been declared,
frame is the response from RabbitMQ
"""
global channel
channel.basic_consume(self._QUEUE_NAME_RECEIVE, self.handle_delevery)
channel.start_consuming()
def handle_delevery(self, channel, method, header, body):
"""Called when we receive a message from RabbitMQ"""
print(body)
#do some action
#route request
def publish_message(self, message):
"""Used to publish a message to RabbitMQ"""
global channel
channel.queue_declare(
queue=self._QUEUE_NAME_SEND,
durable=False
)
channel.basic_publish(
exchange='',
routing_key=self._QUEUE_NAME_SEND,
body=message,
properties=pika.BasicProperties(
delivery_mode=2
)
)
rabbitmq = RabbitmqManager()
rabbitmq.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment