Last active
January 4, 2016 08:09
-
-
Save kvdveer/8593166 to your computer and use it in GitHub Desktop.
Openstack firehose client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright 2014 CloudVPS | |
# Author: Koert van der Veer <koert@cloudvps.com> | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Be sure to set up a firehose: http://www.rabbitmq.com/firehose.html | |
from __future__ import print_function | |
import json | |
import pika | |
import datetime | |
import re | |
import argparse | |
try: | |
from jsonpath_rw import parse as jsonpath_parse | |
except ImportError: | |
jsonpath_parse = None | |
def callback(channel, method, properties, body): | |
try: | |
channel.message_count += 1 | |
data = json.loads(body) | |
if "oslo.message" in data: | |
data = json.loads(data['oslo.message']) | |
unique_id = data.get("_unique_id") | |
if unique_id in channel.message_ids: | |
return | |
if channel.show_all: | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.project == data.get('_context_project_id'): | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.project == data.get('_context_project_name'): | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.user == data.get('_context_user_id'): | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.exchange: | |
exchange = properties.headers['exchange_name'] | |
if channel.args.exchange.match(exchange): | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.related: | |
identifier = data.get('_context_request_id') | |
if identifier in channel.request_identifiers: | |
show_message(channel, method, properties, data) | |
return | |
identifier = data.get('_msg_id') | |
if identifier in channel.request_identifiers: | |
show_message(channel, method, properties, data) | |
return | |
if channel.args.grep: | |
grep = channel.args.grep | |
if grep in json.dumps(data) or grep in repr(properties.headers): | |
show_message(channel, method, properties, data) | |
return | |
except Exception as e: | |
print(e) | |
def show_message(channel, method, properties, body): | |
channel.filter_count += 1 | |
if channel.args.related: | |
if '_context_request_id' in body: | |
channel.request_identifiers.add(body['_context_request_id']) | |
if '_msg_id' in body: | |
channel.request_identifiers.add(body['_msg_id']) | |
if not channel.args.duplicates: | |
unique_id = body.get("_unique_id") | |
channel.message_ids.add(unique_id) | |
if not channel.args.context: | |
for key in body.keys(): | |
if key.startswith("_context_"): | |
del body[key] | |
if not channel.args.private: | |
for key in body.keys(): | |
if key.startswith("_") and not key.startswith('_context_'): | |
del body[key] | |
if channel.args.timestamp: | |
print("# TIME: %s" % datetime.datetime.now()) | |
if channel.args.amqp: | |
indent = "#" + 16*" " | |
for k, v in sorted(properties.headers.items()): | |
value = channel.dump_short(v) | |
value = value.replace('\n', '\n' + indent).rstrip("\n# ") | |
print("#%-14s: %s" % (k, value)) | |
if channel.args.fields: | |
for fld in channel.args.fields: | |
title = str(fld) | |
matches = fld.find(body) | |
for match in matches: | |
value = channel.dump_short(match.value) | |
value = value.replace('\n', '\n' + " "*(len(title)+2)).strip() | |
print("%s: %s" % (fld, value)) | |
if not matches: | |
print("%s not found" % (fld)) | |
else: | |
print(channel.dump_long(body)) | |
print() | |
def jp_parse(value): | |
return jsonpath_parse(value) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-r", "--related", action="store_true", | |
help="Include messages related to the accepted ones") | |
parser.add_argument("-p", "--project", type=str, default='-', | |
help="Include messages for selected project") | |
parser.add_argument("-u", "--user", type=str, default='-', | |
help="Include messages for selected userid") | |
parser.add_argument("-c", "--context", action="store_true", | |
help="Show context fields of messages") | |
parser.add_argument("-P", "--private", action="store_true", | |
help="Show hidden fields of messages") | |
parser.add_argument("-a", "--amqp", action="store_true", | |
help="Show AMQP headers of messages") | |
parser.add_argument("-t", "--timestamp", action="store_true", | |
help="Show timestamp") | |
parser.add_argument("-e", "--exchange", type=re.compile, | |
help="Include messages from this exchange (regex)") | |
parser.add_argument("-g", "--grep", | |
help="Include messages that have this string") | |
parser.add_argument("-d", "--duplicates", action="store_true", | |
help="Include duplicated messages") | |
parser.add_argument("--format", choices=["json", "json-oneline", "yaml"], | |
default="json", | |
help="Display things as yaml instead of json") | |
parser.add_argument("fields", nargs='*', type=jp_parse, | |
help="JSONpath for value to show") | |
args = parser.parse_args() | |
connection = pika.BlockingConnection( | |
pika.connection.URLParameters( | |
'amqp://admin:{{rabbitmq.admin_password}}@localhost:25672/%2f' | |
) | |
) | |
channel = connection.channel() | |
channel.message_count = 0 | |
channel.filter_count = 0 | |
channel.args = args | |
channel.request_identifiers = set() | |
channel.message_ids = set() | |
channel.show_all = (not args.grep) and (not args.exchange) and \ | |
(args.user == '-') and (args.project == '-') | |
if args.format == 'json': | |
channel.dump_short = lambda a: json.dumps(a, sort_keys=True) | |
channel.dump_long = lambda a: json.dumps(a, indent=1, sort_keys=True) | |
elif args.format == 'json-oneline': | |
channel.dump_short = lambda a: json.dumps(a, sort_keys=True) | |
channel.dump_long = channel.dump_short | |
elif args.format == 'yaml': | |
try: | |
import yaml | |
except: | |
print("pyyaml is needed if you want format as yaml") | |
def yaml_dump(a): | |
result = yaml.safe_dump(a, default_flow_style=False) | |
return result.rstrip() | |
channel.dump_short = yaml_dump | |
channel.dump_long = yaml_dump | |
queue = channel.queue_declare(exclusive=True) | |
channel.queue_bind( | |
exchange='amq.rabbitmq.trace', | |
queue=queue.method.queue, | |
routing_key='#' | |
) | |
channel.basic_consume(callback, queue=queue.method.queue, no_ack=True) | |
try: | |
channel.start_consuming() | |
except KeyboardInterrupt: | |
print() | |
print("%s messages received, %s shown" % (channel.message_count, | |
channel.filter_count)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment