Last active
August 12, 2016 22:28
-
-
Save 140am/0a950d0104e2cd5d696d to your computer and use it in GitHub Desktop.
Example of a helper to listen & debug the OpenStack MQ event bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import json | |
import dateutil.parser | |
import pika | |
FORMAT = '%(asctime)-15s %(levelname)s: %(message)s' | |
logging.basicConfig( | |
format = FORMAT, | |
level = logging.DEBUG | |
) | |
log = logging.getLogger(__name__) | |
class OpenstackMonitor(object): | |
def __init__(self, host): | |
self.connection = None | |
self.channel = None | |
self.amqp_host = host | |
self.connect_server() | |
def connect_server(self): | |
parameters = pika.ConnectionParameters(self.amqp_host) | |
self.connection = pika.SelectConnection( | |
parameters, self.on_connected | |
) | |
def process_msg(self, channel, method, header, body): | |
if 'node' in header.headers: | |
log.warn('node: %s | exchange: %s | route: %s' % ( | |
header.headers['node'], method.routing_key, | |
header.headers['routing_keys'] | |
)) | |
decode_body = json.loads(body) | |
if '_context_timestampSS' in decode_body: | |
try: | |
time_stamp = dateutil.parser.parse(decode_body['_context_timestamp']) | |
except: | |
import pdb | |
pdb.set_trace() | |
if not decode_body['method'] == 'update_service_capabilities': | |
log.warn('service: %s' % decode_body['args']['service_name']) | |
log.info('%s | host: %s | vCPU: %s/%s | mem: %s/%s | disk: %s/%s' % ( | |
time_stamp.strftime('%Y-%m-%d %H:%M:%S'), | |
decode_body['args']['host'], | |
decode_body['args']['capabilities']['vcpus'] - decode_body['args']['capabilities']['vcpus_used'], | |
decode_body['args']['capabilities']['vcpus'], | |
decode_body['args']['capabilities']['host_memory_free'], | |
decode_body['args']['capabilities']['host_memory_total'], | |
decode_body['args']['capabilities']['disk_available'], | |
decode_body['args']['capabilities']['disk_total'] | |
)) | |
else: | |
log.info('MSG: %s' % decode_body) | |
channel.basic_ack(method.delivery_tag) | |
def on_queue_deleted(self, method): | |
log.debug('queue deleted with %i messages' % method.method.message_count) | |
def on_closed(self, frame): | |
log.debug('AMQP connection closed') | |
self.connection.ioloop.stop() | |
def on_queue_declared(self, frame): | |
log.debug('Queue declared') | |
self.channel.queue_bind( | |
exchange = 'amq.rabbitmq.trace', | |
queue = 'test', | |
routing_key = '#' | |
) | |
self.channel.basic_consume( | |
self.process_msg, | |
queue = 'test' | |
) | |
def on_channel_open(self, channel_): | |
log.debug('Channel open') | |
self.channel = channel_ | |
self.channel.basic_qos( | |
prefetch_count = 1 | |
) | |
self.channel.queue_declare( | |
queue = 'test', | |
durable = True, | |
exclusive = False, | |
auto_delete = True, | |
callback = self.on_queue_declared | |
) | |
def on_connected(self, connection): | |
log.debug('connected to AMQP server') | |
self.connection.add_on_close_callback(self.on_closed) | |
self.connection.channel(self.on_channel_open) | |
def tear_down(self): | |
#self.channel.queue_delete( | |
# self.on_queue_deleted, | |
# queue = 'test' | |
#) | |
self.connection.close() | |
self.connection.ioloop.start() | |
def run(self): | |
try: | |
self.connection.ioloop.start() | |
except KeyboardInterrupt: | |
self.tear_down() | |
tt = OpenstackMonitor(host = '10.10.10.10') | |
tt.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import json | |
import pika | |
FORMAT = '%(asctime)-15s %(levelname)s: %(message)s' | |
logging.basicConfig( | |
format = FORMAT, | |
level = logging.DEBUG | |
) | |
log = logging.getLogger(__name__) | |
class OpenstackMonitor(object): | |
def __init__(self, host, callback): | |
self.amqp_host = host | |
self.connection = None | |
self.channel = None | |
self.callback = callback | |
self.connect_server() | |
def connect_server(self): | |
parameters = pika.ConnectionParameters(self.amqp_host) | |
self.connection = pika.SelectConnection( | |
parameters, self.on_connected | |
) | |
def process_msg(self, channel, method, header, body): | |
decode_body = json.loads(body) | |
log.info('MSG: %s' % method) | |
if method.exchange == 'quantum' and method.routing_key == 'notifications.info': | |
# new network port has been created | |
if decode_body['event_type'] == 'port.create.end': | |
log.info('new port: %s' % decode_body['payload']) | |
else: | |
log.info('event: %s: %s' % ( | |
decode_body['event_type'], | |
decode_body['payload'] | |
)) | |
else: | |
log.info('MSG: %s' % decode_body) | |
self.callback(decode_body) | |
channel.basic_ack(method.delivery_tag) | |
def on_queue_deleted(self, method): | |
log.debug('queue deleted with %i messages' % method.method.message_count) | |
def on_closed(self, frame): | |
log.debug('AMQP connection closed') | |
self.connection.ioloop.stop() | |
def on_queue_declared(self, frame): | |
log.debug('Queue declared') | |
self.channel.queue_bind( | |
exchange = 'quantum', | |
queue = 'quantum_monitor', | |
routing_key = 'notifications.info' | |
) | |
self.channel.basic_consume( | |
self.process_msg, | |
queue = 'quantum_monitor' | |
) | |
def on_channel_open(self, channel_): | |
log.debug('Channel open') | |
self.channel = channel_ | |
self.channel.basic_qos( | |
prefetch_count = 1 | |
) | |
self.channel.queue_declare( | |
queue = 'quantum_monitor', | |
durable = True, | |
exclusive = False, | |
auto_delete = True, | |
callback = self.on_queue_declared | |
) | |
def on_connected(self, connection): | |
log.debug('connected to AMQP server') | |
self.connection.add_on_close_callback(self.on_closed) | |
self.connection.channel(self.on_channel_open) | |
def tear_down(self): | |
#self.channel.queue_delete( | |
# self.on_queue_deleted, | |
# queue = 'test' | |
#) | |
self.connection.close() | |
self.connection.ioloop.start() | |
def run(self): | |
try: | |
self.connection.ioloop.start() | |
except KeyboardInterrupt: | |
self.tear_down() | |
def call_back(msg): | |
log.info('callback MSG: %s' % msg) | |
tt = OpenstackMonitor( | |
host = '10.10.10.10', | |
callback = call_back | |
) | |
tt.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment