Skip to content

Instantly share code, notes, and snippets.

@circa10a
Created August 8, 2020 23:17
Show Gist options
  • Save circa10a/9d69f84d9fb18aae19e6eac38de35cf8 to your computer and use it in GitHub Desktop.
Save circa10a/9d69f84d9fb18aae19e6eac38de35cf8 to your computer and use it in GitHub Desktop.
vimeo sre code eval
#!/usr/bin/env python3
import argparse
from datetime import datetime
from os.path import isfile
from logging import getLogger, basicConfig
# Format used for time range input and parsing strings to date objects (ms)
time_format = '%Y-%m-%d %H:%M:%S.%f'
# Validate user input of timeframes
def validate_date(date):
try:
datetime.strptime(date, time_format)
return date
except ValueError:
msg = f"Invalid timeframe provided: {date}. Timeframe should be in {time_format} format"
raise argparse.ArgumentTypeError(msg)
# Arguments
parser = argparse.ArgumentParser(prog='HTTP Log Analyzer', usage='log_analysis.py [options]', description='Check error ratio of Vimeo HTTP requests')
parser.add_argument('--start-time', type=validate_date, help='Begin time to check log timestamps', required=True)
parser.add_argument('--end-time', type=validate_date, help='End time to check log timestamps', required=True)
parser.add_argument('--log-files', nargs='+', help='Log files to check response codes', required=True)
# Logging
logger = getLogger(__name__)
basicConfig(format='[%(levelname)s] %(message)s', level='INFO')
# Ensure start time is less than end time
def validate_start_end_input(start_time, end_time):
if datetime.strptime(start_time, time_format) > datetime.strptime(end_time, time_format):
logger.error('Start time is greater than end time. Exiting...')
exit(1)
# Ensure files exist and are readable
def validate_file(file):
if not isfile(file):
logger.error(f"File {file} not found. Exiting...")
exit(1)
# Find every domain in results and output percentages
def output_results(start_time, end_time, http_requests):
logger.info(f"Between {start_time} and {end_time}:")
for domain in http_requests:
logger.info(f"{domain} returned {round((http_requests[domain]['http_errors'] / http_requests[domain]['http_requests_total'] * 100), 2)}% 5xx errors")
if __name__ == "__main__":
# dictionary to track all domains
http_requests = {}
# Parse user arguments
args = parser.parse_args()
# Ensure user provides comparable times
validate_start_end_input(start_time=args.start_time, end_time=args.end_time)
# Convert user input to date objects
start_time = datetime.strptime(args.start_time, time_format)
end_time = datetime.strptime(args.end_time, time_format)
# Begin reading all log files
for file in args.log_files:
# Ensure file is accessible
validate_file(file)
with open(file, 'r') as log:
for line in log:
# Split each line string into list
log_entry = [entry.strip() for entry in line.split('|')]
# Convert epoch timestamp to datetime object
log_entry_time = datetime.fromtimestamp(float(log_entry[0]))
log_entry_domain = log_entry[2]
log_entry_status_code = log_entry[4]
# Save data in between time constraints set by user
if log_entry_time >= start_time and log_entry_time <= end_time:
# Initialize dictionary item (per domain)
if log_entry_domain not in http_requests:
http_requests[log_entry_domain] = {
'http_requests_total': 0,
'http_errors': 0
}
# Count each line as a request
http_requests[log_entry_domain]['http_requests_total'] += 1
# If http status code is in the 500's, record it
if log_entry_status_code.startswith('5'):
http_requests[log_entry_domain]['http_errors'] += 1
# Read dictionary to calculate results
output_results(start_time, end_time, http_requests)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment