Run PerfTest with the following arguments:
--rate 1 --producers 1 --consumers 1 --exchange gh-976 --routing-key gh-976
import logging | |
import os | |
import socket | |
import json | |
import importlib | |
import pika | |
logger = logging.getLogger("ipacc") | |
class AMQPconsume(object): | |
EXCHANGE = None | |
EXCHANGE_TYPE = None | |
QUEUE = None | |
ROUTING_KEY = None | |
CACHE = None | |
LOOKUP = None | |
PLUGIN = None | |
LABEL = None | |
def __init__(self, amqp_url): | |
self._connection = None | |
self._channel = None | |
self._closing = False | |
self._consumer_tag = None | |
self._url = amqp_url | |
def connect(self): | |
logger.info('Connecting to %s', self._url) | |
return pika.SelectConnection(pika.URLParameters(self._url), self.on_connection_open, stop_ioloop_on_close=False) | |
def on_connection_open(self, unused_connection): | |
logger.info('Connection opened') | |
self.add_on_connection_close_callback() | |
self.open_channel() | |
def add_on_connection_close_callback(self): | |
logger.info('Adding connection close callback') | |
self._connection.add_on_close_callback(self.on_connection_closed) | |
def on_connection_closed(self, connection, reply_code, reply_text): | |
self._channel = None | |
if self._closing: | |
self._connection.ioloop.stop() | |
else: | |
logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', reply_code, reply_text) | |
self._connection.add_timeout(5, self.reconnect) | |
def reconnect(self): | |
self._connection.ioloop.stop() | |
if not self._closing: | |
self._connection = self.connect() | |
self._connection.ioloop.start() | |
def open_channel(self): | |
logger.info('Creating a new channel') | |
self._connection.channel(on_open_callback=self.on_channel_open) | |
def on_channel_open(self, channel): | |
logger.info('Channel opened') | |
self._channel = channel | |
self.add_on_channel_close_callback() | |
self.setup_exchange(self.EXCHANGE) | |
def add_on_channel_close_callback(self): | |
logger.info('Adding channel close callback') | |
self._channel.add_on_close_callback(self.on_channel_closed) | |
def on_channel_closed(self, channel, reply_code, reply_text): | |
logger.warning('Channel %i was closed: (%s) %s', channel, reply_code, reply_text) | |
self._connection.close() | |
def setup_exchange(self, exchange_name): | |
logger.info('Declaring exchange %s', exchange_name) | |
self._channel.exchange_declare(self.on_exchange_declareok, exchange_name, self.EXCHANGE_TYPE) | |
def on_exchange_declareok(self, unused_frame): | |
logger.info('Exchange declared') | |
self.setup_queue(self.QUEUE) | |
def setup_queue(self, queue_name): | |
logger.info('Declaring queue %s', queue_name) | |
self._channel.queue_declare(self.on_queue_declareok, queue_name) | |
def on_queue_declareok(self, method_frame): | |
logger.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, self.ROUTING_KEY) | |
self._channel.queue_bind(self.on_bindok, self.QUEUE, self.EXCHANGE, self.ROUTING_KEY) | |
def on_bindok(self, unused_frame): | |
logger.info('Queue bound') | |
self.start_consuming() | |
def start_consuming(self): | |
logger.info('Issuing consumer related RPC commands') | |
self.add_on_cancel_callback() | |
self._channel.basic_qos(prefetch_count=1) | |
self._consumer_tag = self._channel.basic_consume(self.on_message, self.QUEUE) | |
def add_on_cancel_callback(self): | |
logger.info('Adding consumer cancellation callback') | |
self._channel.add_on_cancel_callback(self.on_consumer_cancelled) | |
def on_consumer_cancelled(self, method_frame): | |
logger.info('Consumer was cancelled remotely, shutting down: %r', method_frame) | |
if self._channel: | |
self._channel.close() | |
def on_message(self, unused_channel, basic_deliver, properties, body): | |
logger.info('#%s received message %s', self.LABEL, basic_deliver.delivery_tag) | |
self.acknowledge_message(basic_deliver.delivery_tag) | |
def acknowledge_message(self, delivery_tag): | |
logger.info('#%s acknowledging message %s', self.LABEL, delivery_tag) | |
self._channel.basic_ack(delivery_tag) | |
def stop_consuming(self): | |
if self._channel: | |
logger.info('Sending a Basic.Cancel RPC command to RabbitMQ') | |
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) | |
def on_cancelok(self, unused_frame): | |
logger.info('RabbitMQ acknowledged the cancellation of the consumer') | |
self.close_channel() | |
def close_channel(self): | |
logger.info('Closing the channel') | |
self._channel.close() | |
def run(self): | |
self._connection = self.connect() | |
self._connection.ioloop.start() | |
def stop(self): | |
logger.info('Stopping') | |
self._closing = True | |
self.stop_consuming() | |
self._connection.ioloop.start() | |
logger.info('Stopped') | |
def close_connection(self): | |
logger.info('Closing connection') | |
self._connection.close() | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
worker = AMQPconsume('amqp://guest:guest@localhost:5672/%2F') | |
worker.EXCHANGE = 'gh-976' | |
worker.EXCHANGE_TYPE= 'direct' | |
worker.ROUTING_KEY = 'gh-976' | |
worker.QUEUE = 'queue-gh-976' | |
worker.LABEL = 'worker-gh-976' | |
try: | |
worker.run() | |
except Exception as e: | |
logger.error('consuming `pmacct` caused `{}`'.format(repr(e)), exc_info=True) | |
worker.stop() |