Skip to content

Instantly share code, notes, and snippets.

@roymanigley
Created October 24, 2023 17:57
Show Gist options
  • Save roymanigley/24b0d14296891dafdc37c88155211c10 to your computer and use it in GitHub Desktop.
Save roymanigley/24b0d14296891dafdc37c88155211c10 to your computer and use it in GitHub Desktop.
A helper class to initialize pika consumers and publisher
import logging
from pika import BlockingConnection, ConnectionParameters, PlainCredentials
from concurrent.futures import Executor
LOG = logging.getLogger(__name__)
class RabbitMqConnection(object):
def __init__(self,
host='127.0.0.1',
port=5671,
username='guest',
password='guest',
hearth_beat_interval_seconds=10):
self.host = host
self.port = port
self.username = username
self.password = password
self.hearth_beat_interval_seconds = hearth_beat_interval_seconds
self.connection: BlockingConnection = None
self.executor = Executor()
self.reconnect()
def reconnect(self, force=False):
LOG.debug(f'connecting / reconnecting to RabbitMq {self.host}:{self.port}')
if force:
self._disconnect()
if self.connection is None or not self.connection.is_open:
self.connection = BlockingConnection(
ConnectionParameters(
host=self.host,
port=self.port,
credentials=PlainCredentials(
username=self.username,
password=self.password
)
)
)
self.executor.submit(self._hearth_beat)
def shutdown(self):
LOG.debug(f'shutting down RabbitMq connection {self.host}:{self.port}')
self._disconnect()
self.executor.shutdown(wait=False, cancel_futures=True)
def _disconnect(self):
LOG.debug(f'disconnecting from RabbitMq {self.host}:{self.port}')
if self.connection is not None and self.connection.is_open:
self.connection.close()
def _hearth_beat(self):
while self.connection is not None and self.connection.is_open:
LOG.debug(f'sending hearth beat to RabbitMq {self.host}:{self.port}')
self.connection.sleep(self.hearth_beat_interval_seconds)
class RabbitMqPublisher(object):
def __init__(self, connection: RabbitMqConnection, exchange: str):
self.connection = connection
self.exchange = exchange
self._init_channel()
def _init_channel(self):
pass
def publish(self, routing_key: str, data: dict):
pass
class RabbitMqConsumer(object):
def __init__(self, connection: RabbitMqConnection, exchange: str, routing_key: str):
self.connection = connection
self.exchange = exchange
self.routing_key = routing_key
self._init_channel()
self._init_queue()
self._init_dlq()
def _init_channel(self):
self.channel = self.connection.connection.channel()
def _init_queue(self):
result = self.channel.queue_declare(
self.routing_key,
durable=True,
auto_delete=False,
arguments={
# 'x-message-ttl': 1000,
"x-dead-letter-exchange": "dlx",
}
)
queue_name = result.method.queue
self.channel.queue_bind(
exchange=self.exchange,
routing_key=self.routing_key, # x-dead-letter-routing-key
queue=queue_name
)
def _init_dlq(self):
self.channel.exchange_declare('dlx', exchange_type='direct')
result = self.channel.queue_declare(
self.routing_key,
auto_delete=False
)
queue_name = result.method.queue
self.channel.queue_bind(
exchange='dlx',
routing_key=self.routing_key, # x-dead-letter-routing-key
queue=queue_name
)
def consume(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment