Skip to content

Instantly share code, notes, and snippets.

@elephantum
Created February 11, 2014 10:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elephantum/8932295 to your computer and use it in GitHub Desktop.
Save elephantum/8932295 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# apt-get install python-pandas python-boto
AWS_ACCESS_KEY_ID =
AWS_SECRET_ACCESS_KEY =
import datetime
import gzip
import json
import os
import pandas
import sys
import time
import re
import numpy
import boto.ec2.cloudwatch
parts_re = re.compile(r'(\w+) /?(/[^ ]*) .*')
url_1part_re = re.compile(r'(/[^/]+).*')
url_2parts_re = re.compile(r'(/[^/]+(/[^/?]+)?).*')
# TODO inode support
# {
# filename: {
# last_size :: int pointer offset
# last_ts :: timestamp
# }
# }
STATE_FILE = '/home/elephantum/rm-monitoring/state.json'
LOG_FILE = '/var/log/rm-release/nginx-access.log'
def load_data(state, filename):
'''
return: (state, lines :: [str], time_slice :: float seconds)
'''
cur_size = os.path.getsize(filename)
cur_ts = time.time()
if filename not in state:
state[filename] = {
'last_size': cur_size,
'last_ts': cur_ts
}
time_slice = 0
lines = []
return state, lines, time_slice
else:
file_state = state[filename]
if file_state['last_size'] > cur_size:
file_state['last_size'] = 0
f = file(filename)
f.seek(file_state['last_size'])
lines = f.readlines()
time_slice = cur_ts - file_state['last_ts']
file_state['last_size'] = f.tell()
file_state['last_ts'] = cur_ts
return state, lines, time_slice
def parse_json_data(lines):
parsed_data = []
for line in lines:
try:
line_data = json.loads(line)
parsed_data.append(line_data)
except:
pass
return pandas.DataFrame(parsed_data)
def parse_nginx_time(x):
return datetime.datetime.strptime(x, '%d/%b/%Y:%H:%M:%S +0000')
def parse_nginx_float(x):
return float(x) if x <> '-' else 0
def guess_service(x):
m = parts_re.match(x)
if not m: return 'Other'
(verb, uri) = m.groups()
if uri.startswith('/api'): return 'Backend'
if uri.startswith('/screenshot'): return 'Screenshot'
if any([uri.startswith(i) for i in ['/img', '/js', '/build', '/css']]): return 'Static'
return 'Frontend'
def guess_backend_module(x):
m = parts_re.match(x)
if not m: return 'Unknown'
(verb, uri) = m.groups()
if any([uri.startswith(i) for i in [
'/api/mag',
'/api/magFull',
'/api/widgetsGroup',
'/api/publish',
'/api/unpublish',
'/api/checkdomain',
'/api/domainmaptest',
'/api/monitoring',
'/api/pageOrder',
'/api/page_templates',
'/api/widget',
'/api/guides',
'/api/imageeffect',
'/api/imageeffects',
'/api/imagefinal',
'/api/imagepreview',
'/api/imagescale',
'/api/page',
]]): return 'BackendConstructor'
if any([uri.startswith(i) for i in [
'/api/upload',
'/api/template_images',
]]): return 'BackendStatic'
if any([uri.startswith(i) for i in [
'/api/authservice',
'/api/user',
'/api/me',
'/api/register',
'/api/checkusername',
'/api/redirect',
'/api/changepass',
'/api/ping',
'/api/auth',
'/api/recoverypass',
]]): return 'BackendUser'
if any([uri.startswith(i) for i in [
'/api/readymag',
'/api/readymags', # i know it's redundant
'/api/embed/readymag',
'/api/domain/readymag',
'/api/domain/readymags/user',
'/api/likemag',
'/api/unlikemag',
'/api/likers/mag',
'/api/magpages',
'/api/countView',
'/api/countview',
'/api/datetime',
]]): return 'BackendCollector'
if uri.startswith('/api'): return 'BackendOther'
def guess_req_type(x):
m = parts_re.match(x)
if not m: return '???'
(verb, uri) = m.groups()
if any([uri.startswith(i) for i in ('/screenshot', '/img', '/build', '/js')]):
return url_1part_re.match(uri).group(1)
if uri.startswith('/api'):
return url_2parts_re.match(uri).group(1)
if uri == '/': return '/'
return 'other'
_percentiles_funs = {
'Mean': lambda x: numpy.mean(x),
'P75': lambda x: numpy.percentile(x, 75),
# 'P90': lambda x: numpy.percentile(x, 90),
'P95': lambda x: numpy.percentile(x, 95),
}
def process_data(data, time_slice):
'''
data :: pandas.DataFrame
time_slice :: float seconds since last data
return (metric_name :: str, value :: float)
'''
data['msec_num'] = data['msec'].apply(parse_nginx_float)
data['request_time_num'] = data['request_time'].apply(parse_nginx_float)
data['dt'] = data['time_local'].apply(parse_nginx_time)
data['service'] = data['request'].apply(guess_service)
data['req_type'] = data['request'].apply(guess_req_type)
data['backend_module'] = data['request'].apply(guess_backend_module)
group_total = data.groupby(lambda x: True)
yield 'TotalQPS', 'Count/Second', group_total.size()[True] / time_slice
latency_total = group_total['request_time_num'].agg(_percentiles_funs)
for p in latency_total.columns:
yield 'TotalLatency{0}'.format(p), 'Seconds', latency_total[p][True]
### service
group_by_service = data.groupby('service')
count_by_service = group_by_service.size()
for service in count_by_service.index:
yield '{0}QPS'.format(service), 'Count/Second', count_by_service[service] / time_slice
latency_by_service = group_by_service['request_time_num'].agg(_percentiles_funs)
for service in latency_by_service.index:
for p in latency_by_service.columns:
yield '{0}Latency{1}'.format(service, p), 'Seconds', latency_by_service[p][service]
### backend_module
group_by_backend_module = data.groupby('backend_module')
count_by_backend_module = group_by_backend_module.size()
for backend_module in ['BackendConstructor', 'BackendCollector', 'BackendUser', 'BackendStatic', 'BackendOther']:
if backend_module in count_by_backend_module:
yield '{0}QPS'.format(backend_module), 'Count/Second', count_by_backend_module[backend_module] / time_slice
latency_by_backend_module = group_by_backend_module['request_time_num'].agg(_percentiles_funs)
for backend_module in ['BackendConstructor', 'BackendCollector', 'BackendUser', 'BackendStatic', 'BackendOther']:
for p in latency_by_backend_module.columns:
if backend_module in latency_by_backend_module[p]:
yield '{0}Latency{1}'.format(backend_module, p), 'Seconds', latency_by_backend_module[p][backend_module]
def write_data(metrics):
'''
metrics :: (name :: str, unit :: str, value :: float)
'''
cw = boto.ec2.cloudwatch.connect_to_region(
'eu-west-1',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
for name, unit, value in metrics:
cw.put_metric_data('ReadyMag', name, unit=unit, value=value)
if __name__ == '__main__':
if os.path.exists(STATE_FILE):
state = json.load(file(STATE_FILE))
else:
state = {}
state, lines, time_slice = load_data(state, LOG_FILE)
json.dump(state, file(STATE_FILE, 'w+'))
data = parse_json_data(lines)
if len(data) > 0:
metrics = process_data(data, time_slice)
write_data(metrics)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment