Created
February 11, 2014 10:13
-
-
Save elephantum/8932295 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# apt-get install python-pandas python-boto | |
AWS_ACCESS_KEY_ID = | |
AWS_SECRET_ACCESS_KEY = | |
import datetime | |
import gzip | |
import json | |
import os | |
import pandas | |
import sys | |
import time | |
import re | |
import numpy | |
import boto.ec2.cloudwatch | |
parts_re = re.compile(r'(\w+) /?(/[^ ]*) .*') | |
url_1part_re = re.compile(r'(/[^/]+).*') | |
url_2parts_re = re.compile(r'(/[^/]+(/[^/?]+)?).*') | |
# TODO inode support | |
# { | |
# filename: { | |
# last_size :: int pointer offset | |
# last_ts :: timestamp | |
# } | |
# } | |
STATE_FILE = '/home/elephantum/rm-monitoring/state.json' | |
LOG_FILE = '/var/log/rm-release/nginx-access.log' | |
def load_data(state, filename): | |
''' | |
return: (state, lines :: [str], time_slice :: float seconds) | |
''' | |
cur_size = os.path.getsize(filename) | |
cur_ts = time.time() | |
if filename not in state: | |
state[filename] = { | |
'last_size': cur_size, | |
'last_ts': cur_ts | |
} | |
time_slice = 0 | |
lines = [] | |
return state, lines, time_slice | |
else: | |
file_state = state[filename] | |
if file_state['last_size'] > cur_size: | |
file_state['last_size'] = 0 | |
f = file(filename) | |
f.seek(file_state['last_size']) | |
lines = f.readlines() | |
time_slice = cur_ts - file_state['last_ts'] | |
file_state['last_size'] = f.tell() | |
file_state['last_ts'] = cur_ts | |
return state, lines, time_slice | |
def parse_json_data(lines): | |
parsed_data = [] | |
for line in lines: | |
try: | |
line_data = json.loads(line) | |
parsed_data.append(line_data) | |
except: | |
pass | |
return pandas.DataFrame(parsed_data) | |
def parse_nginx_time(x): | |
return datetime.datetime.strptime(x, '%d/%b/%Y:%H:%M:%S +0000') | |
def parse_nginx_float(x): | |
return float(x) if x <> '-' else 0 | |
def guess_service(x): | |
m = parts_re.match(x) | |
if not m: return 'Other' | |
(verb, uri) = m.groups() | |
if uri.startswith('/api'): return 'Backend' | |
if uri.startswith('/screenshot'): return 'Screenshot' | |
if any([uri.startswith(i) for i in ['/img', '/js', '/build', '/css']]): return 'Static' | |
return 'Frontend' | |
def guess_backend_module(x): | |
m = parts_re.match(x) | |
if not m: return 'Unknown' | |
(verb, uri) = m.groups() | |
if any([uri.startswith(i) for i in [ | |
'/api/mag', | |
'/api/magFull', | |
'/api/widgetsGroup', | |
'/api/publish', | |
'/api/unpublish', | |
'/api/checkdomain', | |
'/api/domainmaptest', | |
'/api/monitoring', | |
'/api/pageOrder', | |
'/api/page_templates', | |
'/api/widget', | |
'/api/guides', | |
'/api/imageeffect', | |
'/api/imageeffects', | |
'/api/imagefinal', | |
'/api/imagepreview', | |
'/api/imagescale', | |
'/api/page', | |
]]): return 'BackendConstructor' | |
if any([uri.startswith(i) for i in [ | |
'/api/upload', | |
'/api/template_images', | |
]]): return 'BackendStatic' | |
if any([uri.startswith(i) for i in [ | |
'/api/authservice', | |
'/api/user', | |
'/api/me', | |
'/api/register', | |
'/api/checkusername', | |
'/api/redirect', | |
'/api/changepass', | |
'/api/ping', | |
'/api/auth', | |
'/api/recoverypass', | |
]]): return 'BackendUser' | |
if any([uri.startswith(i) for i in [ | |
'/api/readymag', | |
'/api/readymags', # i know it's redundant | |
'/api/embed/readymag', | |
'/api/domain/readymag', | |
'/api/domain/readymags/user', | |
'/api/likemag', | |
'/api/unlikemag', | |
'/api/likers/mag', | |
'/api/magpages', | |
'/api/countView', | |
'/api/countview', | |
'/api/datetime', | |
]]): return 'BackendCollector' | |
if uri.startswith('/api'): return 'BackendOther' | |
def guess_req_type(x): | |
m = parts_re.match(x) | |
if not m: return '???' | |
(verb, uri) = m.groups() | |
if any([uri.startswith(i) for i in ('/screenshot', '/img', '/build', '/js')]): | |
return url_1part_re.match(uri).group(1) | |
if uri.startswith('/api'): | |
return url_2parts_re.match(uri).group(1) | |
if uri == '/': return '/' | |
return 'other' | |
_percentiles_funs = { | |
'Mean': lambda x: numpy.mean(x), | |
'P75': lambda x: numpy.percentile(x, 75), | |
# 'P90': lambda x: numpy.percentile(x, 90), | |
'P95': lambda x: numpy.percentile(x, 95), | |
} | |
def process_data(data, time_slice): | |
''' | |
data :: pandas.DataFrame | |
time_slice :: float seconds since last data | |
return (metric_name :: str, value :: float) | |
''' | |
data['msec_num'] = data['msec'].apply(parse_nginx_float) | |
data['request_time_num'] = data['request_time'].apply(parse_nginx_float) | |
data['dt'] = data['time_local'].apply(parse_nginx_time) | |
data['service'] = data['request'].apply(guess_service) | |
data['req_type'] = data['request'].apply(guess_req_type) | |
data['backend_module'] = data['request'].apply(guess_backend_module) | |
group_total = data.groupby(lambda x: True) | |
yield 'TotalQPS', 'Count/Second', group_total.size()[True] / time_slice | |
latency_total = group_total['request_time_num'].agg(_percentiles_funs) | |
for p in latency_total.columns: | |
yield 'TotalLatency{0}'.format(p), 'Seconds', latency_total[p][True] | |
### service | |
group_by_service = data.groupby('service') | |
count_by_service = group_by_service.size() | |
for service in count_by_service.index: | |
yield '{0}QPS'.format(service), 'Count/Second', count_by_service[service] / time_slice | |
latency_by_service = group_by_service['request_time_num'].agg(_percentiles_funs) | |
for service in latency_by_service.index: | |
for p in latency_by_service.columns: | |
yield '{0}Latency{1}'.format(service, p), 'Seconds', latency_by_service[p][service] | |
### backend_module | |
group_by_backend_module = data.groupby('backend_module') | |
count_by_backend_module = group_by_backend_module.size() | |
for backend_module in ['BackendConstructor', 'BackendCollector', 'BackendUser', 'BackendStatic', 'BackendOther']: | |
if backend_module in count_by_backend_module: | |
yield '{0}QPS'.format(backend_module), 'Count/Second', count_by_backend_module[backend_module] / time_slice | |
latency_by_backend_module = group_by_backend_module['request_time_num'].agg(_percentiles_funs) | |
for backend_module in ['BackendConstructor', 'BackendCollector', 'BackendUser', 'BackendStatic', 'BackendOther']: | |
for p in latency_by_backend_module.columns: | |
if backend_module in latency_by_backend_module[p]: | |
yield '{0}Latency{1}'.format(backend_module, p), 'Seconds', latency_by_backend_module[p][backend_module] | |
def write_data(metrics): | |
''' | |
metrics :: (name :: str, unit :: str, value :: float) | |
''' | |
cw = boto.ec2.cloudwatch.connect_to_region( | |
'eu-west-1', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
for name, unit, value in metrics: | |
cw.put_metric_data('ReadyMag', name, unit=unit, value=value) | |
if __name__ == '__main__': | |
if os.path.exists(STATE_FILE): | |
state = json.load(file(STATE_FILE)) | |
else: | |
state = {} | |
state, lines, time_slice = load_data(state, LOG_FILE) | |
json.dump(state, file(STATE_FILE, 'w+')) | |
data = parse_json_data(lines) | |
if len(data) > 0: | |
metrics = process_data(data, time_slice) | |
write_data(metrics) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment