Skip to content

Instantly share code, notes, and snippets.

@kvdveer
Last active January 4, 2016 08:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kvdveer/8593166 to your computer and use it in GitHub Desktop.
Save kvdveer/8593166 to your computer and use it in GitHub Desktop.
Openstack firehose client
#!/usr/bin/env python
# Copyright 2014 CloudVPS
# Author: Koert van der Veer <koert@cloudvps.com>
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Be sure to set up a firehose: http://www.rabbitmq.com/firehose.html
from __future__ import print_function
import json
import pika
import datetime
import re
import argparse
try:
from jsonpath_rw import parse as jsonpath_parse
except ImportError:
jsonpath_parse = None
def callback(channel, method, properties, body):
try:
channel.message_count += 1
data = json.loads(body)
if "oslo.message" in data:
data = json.loads(data['oslo.message'])
unique_id = data.get("_unique_id")
if unique_id in channel.message_ids:
return
if channel.show_all:
show_message(channel, method, properties, data)
return
if channel.args.project == data.get('_context_project_id'):
show_message(channel, method, properties, data)
return
if channel.args.project == data.get('_context_project_name'):
show_message(channel, method, properties, data)
return
if channel.args.user == data.get('_context_user_id'):
show_message(channel, method, properties, data)
return
if channel.args.exchange:
exchange = properties.headers['exchange_name']
if channel.args.exchange.match(exchange):
show_message(channel, method, properties, data)
return
if channel.args.related:
identifier = data.get('_context_request_id')
if identifier in channel.request_identifiers:
show_message(channel, method, properties, data)
return
identifier = data.get('_msg_id')
if identifier in channel.request_identifiers:
show_message(channel, method, properties, data)
return
if channel.args.grep:
grep = channel.args.grep
if grep in json.dumps(data) or grep in repr(properties.headers):
show_message(channel, method, properties, data)
return
except Exception as e:
print(e)
def show_message(channel, method, properties, body):
channel.filter_count += 1
if channel.args.related:
if '_context_request_id' in body:
channel.request_identifiers.add(body['_context_request_id'])
if '_msg_id' in body:
channel.request_identifiers.add(body['_msg_id'])
if not channel.args.duplicates:
unique_id = body.get("_unique_id")
channel.message_ids.add(unique_id)
if not channel.args.context:
for key in body.keys():
if key.startswith("_context_"):
del body[key]
if not channel.args.private:
for key in body.keys():
if key.startswith("_") and not key.startswith('_context_'):
del body[key]
if channel.args.timestamp:
print("# TIME: %s" % datetime.datetime.now())
if channel.args.amqp:
indent = "#" + 16*" "
for k, v in sorted(properties.headers.items()):
value = channel.dump_short(v)
value = value.replace('\n', '\n' + indent).rstrip("\n# ")
print("#%-14s: %s" % (k, value))
if channel.args.fields:
for fld in channel.args.fields:
title = str(fld)
matches = fld.find(body)
for match in matches:
value = channel.dump_short(match.value)
value = value.replace('\n', '\n' + " "*(len(title)+2)).strip()
print("%s: %s" % (fld, value))
if not matches:
print("%s not found" % (fld))
else:
print(channel.dump_long(body))
print()
def jp_parse(value):
return jsonpath_parse(value)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--related", action="store_true",
help="Include messages related to the accepted ones")
parser.add_argument("-p", "--project", type=str, default='-',
help="Include messages for selected project")
parser.add_argument("-u", "--user", type=str, default='-',
help="Include messages for selected userid")
parser.add_argument("-c", "--context", action="store_true",
help="Show context fields of messages")
parser.add_argument("-P", "--private", action="store_true",
help="Show hidden fields of messages")
parser.add_argument("-a", "--amqp", action="store_true",
help="Show AMQP headers of messages")
parser.add_argument("-t", "--timestamp", action="store_true",
help="Show timestamp")
parser.add_argument("-e", "--exchange", type=re.compile,
help="Include messages from this exchange (regex)")
parser.add_argument("-g", "--grep",
help="Include messages that have this string")
parser.add_argument("-d", "--duplicates", action="store_true",
help="Include duplicated messages")
parser.add_argument("--format", choices=["json", "json-oneline", "yaml"],
default="json",
help="Display things as yaml instead of json")
parser.add_argument("fields", nargs='*', type=jp_parse,
help="JSONpath for value to show")
args = parser.parse_args()
connection = pika.BlockingConnection(
pika.connection.URLParameters(
'amqp://admin:{{rabbitmq.admin_password}}@localhost:25672/%2f'
)
)
channel = connection.channel()
channel.message_count = 0
channel.filter_count = 0
channel.args = args
channel.request_identifiers = set()
channel.message_ids = set()
channel.show_all = (not args.grep) and (not args.exchange) and \
(args.user == '-') and (args.project == '-')
if args.format == 'json':
channel.dump_short = lambda a: json.dumps(a, sort_keys=True)
channel.dump_long = lambda a: json.dumps(a, indent=1, sort_keys=True)
elif args.format == 'json-oneline':
channel.dump_short = lambda a: json.dumps(a, sort_keys=True)
channel.dump_long = channel.dump_short
elif args.format == 'yaml':
try:
import yaml
except:
print("pyyaml is needed if you want format as yaml")
def yaml_dump(a):
result = yaml.safe_dump(a, default_flow_style=False)
return result.rstrip()
channel.dump_short = yaml_dump
channel.dump_long = yaml_dump
queue = channel.queue_declare(exclusive=True)
channel.queue_bind(
exchange='amq.rabbitmq.trace',
queue=queue.method.queue,
routing_key='#'
)
channel.basic_consume(callback, queue=queue.method.queue, no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
print()
print("%s messages received, %s shown" % (channel.message_count,
channel.filter_count))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment