Created
October 5, 2017 07:33
-
-
Save stanislavkozlovski/3fe8779a120e849b52c4160b376e68b8 to your computer and use it in GitHub Desktop.
A consumer connection via Pika
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BaseRabbitMQConsumerConnection: | |
""" | |
This is the base class of a RabbitMQ consumer, it sets up and maintains the connection. | |
Classes should inherit this and define the needed static variables to be able to properly set up a connection | |
A handler object must also be sent, one who processes the message. see: on_message method | |
The RabbitMQ connection which is part of the async event loop and receives notifications for it to send out | |
If RabbitMQ closes the connection, it will reopen it. You should | |
look at the output, as there are limited reasons why the connection may | |
be closed, which usually are tied to permission related issues or | |
socket timeouts. | |
If the channel is closed, it will indicate a problem with one of the | |
commands that were issued and that should surface in the output as well. | |
Quick summary of how this class works: | |
NotificationConsumer('localhost:5002...").run() -> calls connect(), | |
which triggers a chain of on_success_xxx methods. These methods declare the exchange, the queue, bind the queue | |
and setup the correct handlers for receiving a message and closing a connection | |
""" | |
NEEDED_STATIC_VARIABLES = ['EXCHANGE', 'EXCHANGE_TYPE', 'QUEUE', 'ROUTING_KEY'] | |
def __init__(self, amqp_url: str, handler): | |
""" | |
:param str amqp_url: The AMQP url to connect with | |
""" | |
self._connection = None | |
self._channel = None | |
self._closing = False | |
self._consumer_tag = None | |
self._url = amqp_url | |
self.handler = handler | |
self._validate_instantiation() | |
def run(self): | |
""" | |
Run the example consumer by connecting to RabbitMQ | |
""" | |
self._connection = self.connect() | |
def connect(self) -> pika.adapters.AsyncioConnection: | |
""" | |
When the connection is established, the on_connection_open method will be invoked by pika. | |
This triggers a chains of on_success_xxx methods which declare an exchange, a queue and bind said queue | |
""" | |
# LOGGER.info('Connecting to %s', self._url) | |
return adapters.AsyncioConnection(pika.URLParameters(self._url), | |
self.on_connection_open) | |
def on_message(self, _: Channel, basic_deliver: Basic.Deliver, | |
__: BasicProperties, body: str): | |
""" | |
The heart of this class, this is the method that processes a received message | |
It sends it to the consumer's receive_message method and expects a boolean value returned, indicating | |
if the message was processed | |
""" | |
LOGGER.info(f'Received message #{basic_deliver.delivery_tag}: {body}') | |
was_processed = self.handler.receive_message(body) | |
if was_processed: | |
self._channel.basic_ack(basic_deliver.delivery_tag) # acknowledge that the message has been processed | |
# Setup methods | |
def on_connection_open(self, _: pika.adapters.AsyncioConnection): | |
""" | |
This method is called by pika once the connection to RabbitMQ has | |
been established. It passes the handle to the connection object in | |
case we need it, but in this case, we'll just mark it unused. | |
""" | |
self._connection.add_on_close_callback(self.on_connection_closed) | |
self._connection.channel(on_open_callback=self.on_channel_open) | |
LOGGER.info(f'{datetime.now()} Connected to RabbitMQ') | |
def on_channel_open(self, channel: Channel): | |
self._channel = channel | |
self._channel.add_on_close_callback(self.on_channel_closed) | |
self.setup_exchange(self.EXCHANGE) | |
def setup_exchange(self, exchange_name: str): | |
LOGGER.info(f'Declaring exchange {exchange_name}') | |
self._channel.exchange_declare(self.on_successful_exchange_declaration, | |
exchange_name, | |
self.EXCHANGE_TYPE) | |
def on_successful_exchange_declaration(self, _: FrameMethod): | |
# LOGGER.info('Exchange declared') | |
self._channel.queue_declare(self.on_successful_queue_declaration, self.QUEUE) | |
def on_successful_queue_declaration(self, _: FrameMethod): | |
""" | |
Binds the queue to the exchange | |
""" | |
LOGGER.info(f'Binding {self.EXCHANGE} to queue {self.QUEUE} with key {self.ROUTING_KEY}') | |
self._channel.queue_bind(self.on_bindok, self.QUEUE, | |
self.EXCHANGE, self.ROUTING_KEY) | |
def on_bindok(self, _: FrameMethod): | |
""" | |
Invoked when a queue is successfully binded. Starts consumation of messages | |
""" | |
# Start consuming messages | |
self._channel.add_on_cancel_callback(self.on_consumer_cancelled) | |
self._consumer_tag = self._channel.basic_consume(self.on_message, | |
self.QUEUE) | |
# Close connection methods/handlers | |
def close_connection(self): | |
"""This method closes the connection to RabbitMQ.""" | |
LOGGER.info('Closing connection to RabbitMQ') | |
self._connection.close() | |
def on_connection_closed(self, _: adapters.AsyncioConnection, reply_code: int, reply_text: str): | |
""" | |
This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. | |
Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. | |
""" | |
self._channel = None | |
if not self._closing: | |
LOGGER.warning(f'Connection closed, reopening in 5 seconds: ({reply_code}) {reply_text}') | |
# Attempt to reconnect | |
self._connection.add_timeout(5, self.reconnect) | |
def on_channel_closed(self, channel: Channel, reply_code: int, reply_text: str): | |
""" | |
Channels are usually closed if you attempt to do something that | |
violates the protocol, such as re-declare an exchange or queue with | |
different parameters. In this case, we'll close the connection | |
to shutdown the object. | |
This also gets called when we cleanly shutdown the connection from the code | |
""" | |
LOGGER.warning(f'Channel {channel} was closed: ({reply_code}) {reply_text}') | |
self._connection.close() | |
def on_consumer_cancelled(self, _: FrameMethod): | |
LOGGER.info(f'Consumer was cancelled remotely, shutting down: {method_frame}') | |
if self._channel: | |
self._channel.close() | |
def on_cancelok(self, _: FrameMethod): | |
""" | |
This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. | |
At this point we will close the channel. | |
This will invoke the on_channel_closed method once the channel has been | |
closed, which will in-turn close the connection. | |
""" | |
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer; Closing the channel') | |
self._channel.close() | |
def stop(self): | |
""" | |
Cleanly shuts down the connection | |
""" | |
LOGGER.info('Stopping') | |
print('Stopping RabbitMQ') | |
self._closing = True | |
if self._channel: | |
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') | |
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) | |
print('Stopped RabbitMQ') | |
LOGGER.info('Stopped') | |
def reconnect(self): | |
""" | |
Will be invoked by the IOLoop timer if the connection is closed. | |
""" | |
if not self._closing: | |
self._connection = self.connect() | |
def _validate_instantiation(self): | |
""" Validates that the class was instantiated properly""" | |
for var_name in self.NEEDED_STATIC_VARIABLES: | |
if not hasattr(self, var_name): | |
raise Exception(f'{var_name} needs to be defined for {self.__class__.__name__}') | |
if not hasattr(self.handler, 'receive_message'): | |
raise Exception('Consumer is required to have defined the receive_message method!') | |
class NotificationsConsumerConnection(BaseRabbitMQConsumerConnection): | |
""" | |
This class defines the needed variables to establish a RabbitMQ connection | |
""" | |
EXCHANGE = 'notifications' | |
EXCHANGE_TYPE = 'fanout' | |
QUEUE = 'text' | |
ROUTING_KEY = 'example.text' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment