Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created September 4, 2014 04:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harlowja/b7ee51133215b7eded82 to your computer and use it in GitHub Desktop.
Save harlowja/b7ee51133215b7eded82 to your computer and use it in GitHub Desktop.
logviewer.py
import argparse
import contextlib
import logging
import re
import readline
import sys
import termios
import tty
import html5lib
import requests
import six
import termcolor
COLOR_MAP = {
'DEBUG': 'blue',
'INFO': 'cyan',
'WARNING': 'yellow',
'WARN': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
'FATAL': 'red',
}
def highlight(line):
for level, color_to_be in six.iteritems(COLOR_MAP):
if isinstance(level, int):
level_name = logging.getLevelName(level)
else:
level_name = level
if level_name in line:
line = line.replace(level_name,
termcolor.colored(level_name, color_to_be))
return line
def get_files(base_url):
matches = []
with contextlib.closing(requests.get(base_url)) as r:
document = html5lib.parse(r.content)
walker = html5lib.getTreeWalker("etree")
stream = walker(document)
for s in stream:
if s.get('type') != 'Characters':
continue
data = s.get('data')
if data and data.endswith('txt.gz') and data not in matches:
matches.append(data)
return matches
def filtered(args, line):
# Format appears to be 2014-09-04 02:45:27.685 27534 <LEVEL> ...
if not args.levels:
return False
allowed_levels = [level.lower() for level in args.levels]
pieces = line.split()
try:
level_name = pieces[3]
level_name = level_name.strip().lower()
if level_name in allowed_levels:
return False
except (ValueError, IndexError):
pass
return True
def getch():
# Variation of http://code.activestate.com/recipes/134892/
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def main():
parser = argparse.ArgumentParser()
parser.add_argument("url", metavar='url', type=str,
help="url to scan for log files",
default=None)
parser.add_argument("--levels", metavar='levels', action="append",
help="levels to show (default: all)",
default=[])
args = parser.parse_args()
if not args.url:
parser.print_help()
return
filenames = get_files(args.url)
while True:
print("Available file names")
print("-" * len("Available file names"))
for i, f in enumerate(filenames):
print("%s. %s" % (i + 1, f))
filename = raw_input('Enter a file name: ')
while filename not in filenames:
print("'%s' is not a valid file name." % filename)
filename = raw_input('Enter a file name: ')
url = "%s/%s" % (args.url, filename)
total_lines = 0
total_displayed = 0
with contextlib.closing(requests.get(url, stream=True)) as r:
displayed = 0
for line in r.iter_lines():
total_lines += 1
if filtered(args, line):
continue
else:
print(highlight(line))
displayed += 1
total_displayed += 1
if displayed >= 30:
c = getch()
if c == 'q':
return
displayed = 0
print("Displayed %s/%s lines" % (total_displayed, total_lines))
more = raw_input("Continue? ")
if more.lower().strip() in ['', 'yes']:
continue
else:
break
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment