Last active
June 7, 2023 22:08
-
-
Save davydany/e9db7d7a50b49b1e0409ca096bbcc718 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/env python3 | |
""" | |
LogScan - by David Daniel (david.daniel@cyware.com) | |
=================================================== | |
Usage | |
----- | |
usage: logscan.py [-h] [--end_time end_time] [--start_time start_time] | |
directory [directory ...] date pattern | |
positional arguments: | |
directory comma separated list of directories to search | |
date date to search for in the format YYYY-MM-DD or DD-MM- | |
YYYY | |
pattern pattern to search for in the log lines | |
optional arguments: | |
-h, --help show this help message and exit | |
--end_time end_time | |
timestamp to search for before, in the format HH:MM:SS | |
--start_time start_time | |
timestamp to search for after, in the format HH:MM:SS | |
""" | |
import argparse | |
import os | |
import re | |
import logging | |
import sys | |
import shutil | |
terminal_width = shutil.get_terminal_size()[0] | |
log_file_pattern = re.compile(r'^.+\.log(\.\d+)?$') | |
logger = logging.getLogger() | |
sh = logging.StreamHandler() | |
sh.setLevel(logging.INFO) | |
logger.addHandler(sh) | |
logger.setLevel(logging.INFO) | |
logger.debug("Starting up...") | |
class TerminalColors: | |
BLUE = '\033[94m' | |
CYAN = '\033[96m' | |
GREEN = '\033[92m' | |
PINK = '\033[91m' | |
YELLOW = '\033[33m' | |
PURPLE = '\033[35m' | |
FAIL = '\033[91m' | |
ENDC = '\033[0m' | |
BOLD = '\033[1m' | |
UNDERLINE = '\033[4m' | |
def process_log_file(filepath, date, pattern, end_time, start_time, context_before, context_after): | |
# pattern = re.compile(pattern) | |
collected = [] | |
with open(filepath, "r") as f: | |
lines = f.readlines() | |
total_lines = len(lines) | |
for i, line in enumerate(lines): | |
lines_before = [] | |
lines_after = [] | |
# Check if the date is in the correct format | |
if date in line: | |
date_match = re.search(r"\d{4}-\d{2}-\d{2}", line) | |
if not date_match: | |
date_match = re.search(r"\d{2}-\d{2}-\d{4}", line) | |
if not date_match: | |
continue | |
date = date_match.group() | |
# Check if the time is in the correct format | |
time_match = re.search(r"\d{2}:\d{2}:\d{2}", line) | |
if not time_match: | |
continue | |
time = time_match.group() | |
# Check if the time is before or after the given times (if provided) | |
if (end_time is None or time < end_time) and (start_time is None or time > start_time): | |
if pattern.lower() in line.lower(): | |
# if pattern.search(line): | |
# if we're requested to give 'context_before', let's get that out | |
if context_before: | |
if i > context_before: | |
lines_before = lines[i-context_before:i] | |
else: | |
lines_before = lines[0:i] | |
# if we're requested to give 'context_after', let's get that out | |
if context_after: | |
if i < (total_lines - context_after): | |
lines_after = lines[i+1 : i + context_after] | |
else: | |
lines_after = lines[i+1 : total_lines] | |
collected.append((filepath, i, date, time, line, lines_before, lines_after)) | |
return collected | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("directory", type=str, nargs='+', help="comma separated list of directories to search") | |
parser.add_argument("date", type=str, help="Date to search for in the format YYYY-MM-DD") | |
parser.add_argument("pattern", type=str, help="Pattern to search for in the log lines") | |
parser.add_argument("-s", "--start_time", type=str, help="(Optional) timestamp to start the search for, in the format HH:MM:SS") | |
parser.add_argument("-e", "--end_time", type=str, help="(Optional) timestamp to end the search at, in the format HH:MM:SS") | |
parser.add_argument("-m", "--match", type=str, help="Checks if the given pattern is part of the log's filename") | |
parser.add_argument("-o", "--omit", type=str, help="If the part of filepath matches the filename that logscan detects, then that filename is omitted") | |
parser.add_argument("-c", "--clear", action="store_true", help="Automatically clears stdout with this flag.") | |
parser.add_argument("-v", "--verbose", action="store_true", help="Prints additional information to show what logscan is doing.") | |
parser.add_argument('-b', '--before', type=int, help='Number of lines of context to print Before the matched line.') | |
parser.add_argument('-a', '--after', type=int, help='Number of lines of context to print AFTER the matched line.') | |
parser.add_argument('-l', '--search-old-logs', type=int, help='With this flag, logscan will not only look at ".log" files, but also any old files that end in ".log.{{ number }}"') | |
args = parser.parse_args() | |
collected = [] | |
if args.verbose: | |
logger.setLevel(logging.DEBUG) | |
sh.setLevel(logging.DEBUG) | |
if args.clear: | |
# clear the screen before beginning | |
os.system('cls' if os.name == 'nt' else 'clear') | |
match = args.match | |
omit = args.omit | |
pattern = args.pattern | |
for directory in args.directory: | |
for root, _, files in os.walk(directory): | |
for file in files: | |
abspath = os.path.join(root, file) | |
should_check_this_file = False | |
# if we're given '--match', make sure that the file we're checking | |
# has a hit with | |
if match: | |
if match.lower() not in abspath.lower(): | |
should_check_this_file = True | |
else: | |
# check if this file should be scanned or not | |
if args.search_old_logs: | |
should_check_this_file = bool(log_file_pattern.match(file)) | |
else: | |
should_check_this_file = file.endswith('.log') | |
# further more, if this file is a proper log file, but it is | |
# also put in the list of files to omit, then we should not | |
# consider scanning this file | |
# | |
# so, if we're given '--omit', make sure that the file we're checking | |
# does not match it. If it does, we'll skip to the next file | |
if omit: | |
if omit.lower() in abspath.lower(): | |
logger.info(f"Omitting { abspath }") | |
should_check_this_file = False | |
# by now, we know if we should check this file or not, so we | |
# will check and process this file if we are supposed to. | |
if should_check_this_file: | |
filepath = os.path.join(root, file) | |
if args.verbose: | |
sys.stdout.write(f"\r Processing { filepath }:") | |
output = process_log_file(filepath, args.date, pattern, args.end_time, args.start_time, args.before, args.after) | |
sys.stdout.flush() | |
if args.verbose: | |
logger.debug(f" { len(output) } results") | |
collected += output | |
if args.verbose: | |
logger.info(f"Number of results: { len(collected) }") | |
# sort the collected items by date and time | |
logger.debug("Sorting results by date/time") | |
collected = sorted(collected, key=lambda x: (x[2], x[3])) | |
logger.debug("Outputting results...") | |
stdout = [] | |
stdout_context = {} | |
context_requested = bool(args.before or args.after) | |
for item in collected: | |
# determine where in the line there is a match | |
filepath, i, date, time, line, lines_before, lines_after = item | |
start = line.lower().find(pattern.lower()) | |
end = start + len(pattern) | |
new_line = f"{ line[0:start] }{ TerminalColors.UNDERLINE }{ TerminalColors.PINK }{ line[start:end-1] }{ TerminalColors.ENDC }{ line[end+1:] }" | |
time_prefix = f"{TerminalColors.CYAN}[{ item[2] }:{ item[3] }]{ TerminalColors.ENDC }" | |
file_prefix = f"{ TerminalColors.GREEN }{ item[0] }:{ TerminalColors.BLUE }{ item[1] }{ TerminalColors.ENDC }" | |
print_line = "{0}{1: <75}{2}".format(time_prefix, file_prefix, new_line).strip() | |
stdout.append(print_line) | |
stdout_context[print_line] = { | |
'before': lines_before, | |
'after': lines_after | |
} | |
stdout = sorted(stdout) | |
for line in stdout: | |
# print a separator so it is easy to read | |
print('=' * terminal_width) | |
# print the lines before | |
if context_requested: | |
lines_before = stdout_context[line]['before'] | |
total_lines_before = len(lines_before) | |
for i, before_line in enumerate(lines_before): | |
line_no_prefix = "{0}{1: >2}{2}".format(TerminalColors.CYAN, "-" + str(total_lines_before - i), TerminalColors.ENDC) | |
print(f"[{ line_no_prefix }]{ TerminalColors.PURPLE }{ before_line.strip() }{ TerminalColors.ENDC }") | |
# print the hit line | |
print(line) | |
# print the lines after | |
if context_requested: | |
lines_after = stdout_context[line]['after'] | |
for i, after_line in enumerate(lines_after): | |
line_no_prefix = "{0}{1: >2}{2}".format(TerminalColors.CYAN, "+" + str(i + 1), TerminalColors.ENDC) | |
print(f"[{ line_no_prefix }]{ TerminalColors.PURPLE }{ after_line.strip() }{ TerminalColors.ENDC }") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment