Skip to content

Instantly share code, notes, and snippets.

@140am
Last active August 12, 2016 22:28
Show Gist options
  • Save 140am/0a950d0104e2cd5d696d to your computer and use it in GitHub Desktop.
Save 140am/0a950d0104e2cd5d696d to your computer and use it in GitHub Desktop.
Example of a helper to listen & debug the OpenStack MQ event bus
import logging
import json
import dateutil.parser
import pika
FORMAT = '%(asctime)-15s %(levelname)s: %(message)s'
logging.basicConfig(
format = FORMAT,
level = logging.DEBUG
)
log = logging.getLogger(__name__)
class OpenstackMonitor(object):
def __init__(self, host):
self.connection = None
self.channel = None
self.amqp_host = host
self.connect_server()
def connect_server(self):
parameters = pika.ConnectionParameters(self.amqp_host)
self.connection = pika.SelectConnection(
parameters, self.on_connected
)
def process_msg(self, channel, method, header, body):
if 'node' in header.headers:
log.warn('node: %s | exchange: %s | route: %s' % (
header.headers['node'], method.routing_key,
header.headers['routing_keys']
))
decode_body = json.loads(body)
if '_context_timestampSS' in decode_body:
try:
time_stamp = dateutil.parser.parse(decode_body['_context_timestamp'])
except:
import pdb
pdb.set_trace()
if not decode_body['method'] == 'update_service_capabilities':
log.warn('service: %s' % decode_body['args']['service_name'])
log.info('%s | host: %s | vCPU: %s/%s | mem: %s/%s | disk: %s/%s' % (
time_stamp.strftime('%Y-%m-%d %H:%M:%S'),
decode_body['args']['host'],
decode_body['args']['capabilities']['vcpus'] - decode_body['args']['capabilities']['vcpus_used'],
decode_body['args']['capabilities']['vcpus'],
decode_body['args']['capabilities']['host_memory_free'],
decode_body['args']['capabilities']['host_memory_total'],
decode_body['args']['capabilities']['disk_available'],
decode_body['args']['capabilities']['disk_total']
))
else:
log.info('MSG: %s' % decode_body)
channel.basic_ack(method.delivery_tag)
def on_queue_deleted(self, method):
log.debug('queue deleted with %i messages' % method.method.message_count)
def on_closed(self, frame):
log.debug('AMQP connection closed')
self.connection.ioloop.stop()
def on_queue_declared(self, frame):
log.debug('Queue declared')
self.channel.queue_bind(
exchange = 'amq.rabbitmq.trace',
queue = 'test',
routing_key = '#'
)
self.channel.basic_consume(
self.process_msg,
queue = 'test'
)
def on_channel_open(self, channel_):
log.debug('Channel open')
self.channel = channel_
self.channel.basic_qos(
prefetch_count = 1
)
self.channel.queue_declare(
queue = 'test',
durable = True,
exclusive = False,
auto_delete = True,
callback = self.on_queue_declared
)
def on_connected(self, connection):
log.debug('connected to AMQP server')
self.connection.add_on_close_callback(self.on_closed)
self.connection.channel(self.on_channel_open)
def tear_down(self):
#self.channel.queue_delete(
# self.on_queue_deleted,
# queue = 'test'
#)
self.connection.close()
self.connection.ioloop.start()
def run(self):
try:
self.connection.ioloop.start()
except KeyboardInterrupt:
self.tear_down()
tt = OpenstackMonitor(host = '10.10.10.10')
tt.run()
import logging
import json
import pika
FORMAT = '%(asctime)-15s %(levelname)s: %(message)s'
logging.basicConfig(
format = FORMAT,
level = logging.DEBUG
)
log = logging.getLogger(__name__)
class OpenstackMonitor(object):
def __init__(self, host, callback):
self.amqp_host = host
self.connection = None
self.channel = None
self.callback = callback
self.connect_server()
def connect_server(self):
parameters = pika.ConnectionParameters(self.amqp_host)
self.connection = pika.SelectConnection(
parameters, self.on_connected
)
def process_msg(self, channel, method, header, body):
decode_body = json.loads(body)
log.info('MSG: %s' % method)
if method.exchange == 'quantum' and method.routing_key == 'notifications.info':
# new network port has been created
if decode_body['event_type'] == 'port.create.end':
log.info('new port: %s' % decode_body['payload'])
else:
log.info('event: %s: %s' % (
decode_body['event_type'],
decode_body['payload']
))
else:
log.info('MSG: %s' % decode_body)
self.callback(decode_body)
channel.basic_ack(method.delivery_tag)
def on_queue_deleted(self, method):
log.debug('queue deleted with %i messages' % method.method.message_count)
def on_closed(self, frame):
log.debug('AMQP connection closed')
self.connection.ioloop.stop()
def on_queue_declared(self, frame):
log.debug('Queue declared')
self.channel.queue_bind(
exchange = 'quantum',
queue = 'quantum_monitor',
routing_key = 'notifications.info'
)
self.channel.basic_consume(
self.process_msg,
queue = 'quantum_monitor'
)
def on_channel_open(self, channel_):
log.debug('Channel open')
self.channel = channel_
self.channel.basic_qos(
prefetch_count = 1
)
self.channel.queue_declare(
queue = 'quantum_monitor',
durable = True,
exclusive = False,
auto_delete = True,
callback = self.on_queue_declared
)
def on_connected(self, connection):
log.debug('connected to AMQP server')
self.connection.add_on_close_callback(self.on_closed)
self.connection.channel(self.on_channel_open)
def tear_down(self):
#self.channel.queue_delete(
# self.on_queue_deleted,
# queue = 'test'
#)
self.connection.close()
self.connection.ioloop.start()
def run(self):
try:
self.connection.ioloop.start()
except KeyboardInterrupt:
self.tear_down()
def call_back(msg):
log.info('callback MSG: %s' % msg)
tt = OpenstackMonitor(
host = '10.10.10.10',
callback = call_back
)
tt.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment