Skip to content

Instantly share code, notes, and snippets.

@billyshambrook
Created November 10, 2015 17:06
Show Gist options
  • Save billyshambrook/55dfe505e842e6dfffb0 to your computer and use it in GitHub Desktop.
Save billyshambrook/55dfe505e842e6dfffb0 to your computer and use it in GitHub Desktop.
Multiple channel pika consumer
import logging
import time
import pika
logger = logging.getLogger(__name__)
class Channel(object):
""" """
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
QUEUE = 'text{}'
ROUTING_KEY = 'example.text'
def __init__(self, connection, identifier):
""" """
self._connection = connection
self._consumer_tag = None
self._id = identifier
def run(self):
""" """
self._connection.channel(on_open_callback=self.on_open_callback)
def on_open_callback(self, channel):
""" """
logger.info('[{}] Channel opened'.format(self._id))
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
""" """
logger.info('[{}] Adding channel close callback'.format(self._id))
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
""" """
logger.warning('[{}] channel {} was closed: ({}) {}'.format(
self._id, channel, reply_code, reply_text))
self._connection.close()
def setup_exchange(self, exchange_name):
""" """
logger.info('[{}] Declaring exchange {}'.format(self._id, exchange_name))
self._channel.exchange_declare(
self.on_exchange_declareok, exchange_name, self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
""" """
logger.info('[{}] Exchange declared'.format(self._id))
queue_name = self.QUEUE.format(self._id)
self.setup_queue(queue_name)
def setup_queue(self, queue_name):
""" """
logger.info('[{}] Declaring queue {}'.format(self._id, queue_name))
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
""" """
logger.info('[{}] Binding {} to {} with {}'.format(
self._id, self.EXCHANGE, self.QUEUE.format(self._id), self.ROUTING_KEY))
self._channel.queue_bind(
self.on_bindok, self.QUEUE.format(self._id), self.EXCHANGE, self.ROUTING_KEY)
def on_bindok(self, unused_frame):
""" """
logger.info('[{}] Queue bound')
self.start_consuming()
def start_consuming(self):
""" """
logger.info('[{}] Issing consumer related RPC commands'.format(self._id))
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(
self.on_message, self.QUEUE.format(self._id))
def add_on_cancel_callback(self):
""" """
logger.info('[{}] Adding consumer cancellation callback'.format(self._id))
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
""" """
logger.info('[{}] Consumer was cancelled remotely, shutting down {}'.format(
self._id, method_frame))
if self._channel:
self._channel.close()
def on_message(self, unused_channel, basic_deliver, properties, body):
""" """
logger.info('[{}] Received message # {} from {}: {}'.format(
self._id, basic_deliver.delivery_tag, properties.app_id, body))
time.sleep(5)
self.acknowledge_message(basic_deliver.delivery_tag)
def acknowledge_message(self, delivery_tag):
""" """
logger.info('[{}] Acknowledging message {}'.format(self._id, delivery_tag))
self._channel.basic_ack(delivery_tag)
def stop_consuming(self):
""" """
if self._channel:
logger.info('[{}] Sending a Basic.Cancel RPC command to RabbitMQ'.format(self._id))
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def on_cancelok(self, unused_frame):
""" """
logger.info('[{}] RabbitMQ acknowledged the cancellation of the consumer'.format(self._id))
self.close_channel()
def close_channel(self):
""" """
logger.info('[{}] Closing channel'.format(self._id))
self._channel.close()
class Consumer(object):
""" """
def __init__(self, amqp_url):
""" """
self._connection = None
self._closing = False
self._channels = []
self._url = amqp_url
def run(self):
""" """
self._connection = self.connect()
self._connection.ioloop.start()
def connect(self):
""" """
logger.info('Connecting to {}'.format(self._url))
return pika.SelectConnection(
pika.URLParameters(self._url),
self.on_connection_open,
stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
""" """
logger.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channels()
def add_on_connection_close_callback(self):
""" """
logger.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reply_code, reply_text):
""" """
self._channels = []
if self._closing:
self._connection.ioloop.stop()
else:
logger.warning('Connection closed, reopening in 5 seconds: ({}) {}'.format(
reply_code, reply_text))
self._connection.add_timeout(5, self.reconnect)
def reconnect(self):
""" """
self._connection.ioloop.stop()
if not self._closing:
self._connection = self.connect()
self._connection.ioloop.start()
def open_channels(self):
""" """
logger.info('Creating new channels.')
for i in range(2):
channel = Channel(self._connection, str(i))
channel.run()
self._channels.append(channel)
def stop(self):
""" """
logger.info('Stopping')
self._closing = True
for channel in self._channels:
channel.stop_consuming()
self._connection.ioloop.start()
logger.info('Stopped')
def close_connection(self):
""" """
logger.info('Closing connection')
self._connection.close()
def main():
""" Entrypoint. """
logging.basicConfig(level=logging.INFO, format='%(message)s')
consumer = Consumer('amqp://guest:guest@localhost:5672/%2F')
try:
consumer.run()
except KeyboardInterrupt:
consumer.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment