Skip to content

Instantly share code, notes, and snippets.

@lukebakken
Created March 6, 2018 23:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukebakken/a23a21dc638eeb9ceaf6c7da10c1ed4d to your computer and use it in GitHub Desktop.
Save lukebakken/a23a21dc638eeb9ceaf6c7da10c1ed4d to your computer and use it in GitHub Desktop.

Run PerfTest with the following arguments:

--rate 1 --producers 1 --consumers 1 --exchange gh-976 --routing-key gh-976
import logging
import os
import socket
import json
import importlib
import pika
logger = logging.getLogger("ipacc")
class AMQPconsume(object):
EXCHANGE = None
EXCHANGE_TYPE = None
QUEUE = None
ROUTING_KEY = None
CACHE = None
LOOKUP = None
PLUGIN = None
LABEL = None
def __init__(self, amqp_url):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
def connect(self):
logger.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url), self.on_connection_open, stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
logger.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()
def add_on_connection_close_callback(self):
logger.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reply_code, reply_text):
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
def reconnect(self):
self._connection.ioloop.stop()
if not self._closing:
self._connection = self.connect()
self._connection.ioloop.start()
def open_channel(self):
logger.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
logger.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
logger.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
logger.warning('Channel %i was closed: (%s) %s', channel, reply_code, reply_text)
self._connection.close()
def setup_exchange(self, exchange_name):
logger.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok, exchange_name, self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
logger.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
logger.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
logger.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE, self.EXCHANGE, self.ROUTING_KEY)
def on_bindok(self, unused_frame):
logger.info('Queue bound')
self.start_consuming()
def start_consuming(self):
logger.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._channel.basic_qos(prefetch_count=1)
self._consumer_tag = self._channel.basic_consume(self.on_message, self.QUEUE)
def add_on_cancel_callback(self):
logger.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
logger.info('Consumer was cancelled remotely, shutting down: %r', method_frame)
if self._channel:
self._channel.close()
def on_message(self, unused_channel, basic_deliver, properties, body):
logger.info('#%s received message %s', self.LABEL, basic_deliver.delivery_tag)
self.acknowledge_message(basic_deliver.delivery_tag)
def acknowledge_message(self, delivery_tag):
logger.info('#%s acknowledging message %s', self.LABEL, delivery_tag)
self._channel.basic_ack(delivery_tag)
def stop_consuming(self):
if self._channel:
logger.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def on_cancelok(self, unused_frame):
logger.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def close_channel(self):
logger.info('Closing the channel')
self._channel.close()
def run(self):
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
logger.info('Stopping')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
logger.info('Stopped')
def close_connection(self):
logger.info('Closing connection')
self._connection.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
worker = AMQPconsume('amqp://guest:guest@localhost:5672/%2F')
worker.EXCHANGE = 'gh-976'
worker.EXCHANGE_TYPE= 'direct'
worker.ROUTING_KEY = 'gh-976'
worker.QUEUE = 'queue-gh-976'
worker.LABEL = 'worker-gh-976'
try:
worker.run()
except Exception as e:
logger.error('consuming `pmacct` caused `{}`'.format(repr(e)), exc_info=True)
worker.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment