Created
August 1, 2012 08:56
-
-
Save bondario/3225181 to your computer and use it in GitHub Desktop.
Group by list of keys sum by list of values
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
perf.py - Aggregates rows to minute interval | |
by fieds: project, script, context, container, mode, host | |
summing fields: sum, cnt, min, max, avg | |
NOTE! ONLY cron usage with 5 min interval. | |
Created by Denis Bondar. | |
""" | |
import sys, re | |
import time | |
import datetime | |
import _mysql | |
import MySQLdb | |
import MySQLdb.constants.CLIENT as CL | |
import json | |
from itertools import groupby, imap | |
from pprint import pprint as pp | |
Error = MySQLdb.DatabaseError | |
''' | |
USE perf2; | |
DROP TABLE `timerlog`; | |
DROP TABLE `timerlog_dict`; | |
CREATE TABLE `timerlog_dict` ( | |
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, | |
`project` char(64) DEFAULT NULL, | |
`script` char(64) DEFAULT NULL, | |
`context` char(64) DEFAULT NULL, | |
`container` char(64) DEFAULT NULL, | |
`mode` char(64) DEFAULT NULL, | |
`hostname` char(64) DEFAULT NULL, | |
PRIMARY KEY (`id`), | |
UNIQUE KEY `mult_key` (`project`,`script`,`context`,`container`,`mode`,`hostname`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |
CREATE TABLE `timerlog` ( | |
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, | |
`dict_id` int(10) unsigned NOT NULL, | |
`ts` int(10) unsigned NOT NULL DEFAULT '0', | |
`sum` int(10) unsigned NOT NULL DEFAULT '0', | |
`cnt` int(10) unsigned NOT NULL DEFAULT '0', | |
`min` int(10) unsigned NOT NULL DEFAULT '0', | |
`max` int(10) unsigned NOT NULL DEFAULT '0', | |
`avg` int(10) unsigned NOT NULL DEFAULT '0', | |
PRIMARY KEY (`id`), | |
FOREIGN KEY (dict_id) REFERENCES timerlog_dict(id) ON DELETE CASCADE, | |
KEY `dict_ts` (`dict_id`,`ts`) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |
''' | |
""" Main SQL to fetch raw rows """ | |
select_sql = """SELECT t.id as id, | |
(ROUND(t.ts/60)*60) as ts, | |
t.sum as sum, | |
t.cnt as cnt, | |
t.min as min, | |
t.max as max, | |
t.avg as avg, | |
NULLIF(hs.hostname,'-') as hostname, | |
NULLIF(hs.script,'-') as script, | |
group_concat(tv.tag order by tv.tag separator ',') as tag, | |
group_concat(tv.value order by tv.tag separator ',') as value | |
FROM perf.timers t, perf.hostscript hs, perf.timertags tg, perf.tag_values tv | |
WHERE t.ts >=UNIX_TIMESTAMP(DATE_FORMAT(NOW() - INTERVAL 5 MINUTE, '%Y-%m-%d %H:%i')) AND t.ts<UNIX_TIMESTAMP(DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i')) | |
AND t.hsid=hs.id | |
AND t.id=tg.timer_id | |
AND tg.tag_value_id=tv.id | |
GROUP BY t.id | |
""" | |
insert_sql = """INSERT INTO perf2.timerlog (ts, dict_id, sum, cnt, min, max, avg) | |
VALUES (%s, %s, %s, %s, %s, %s, %s) | |
""" | |
# Note! THE ORDER IS IMPORTANT | |
select_dict_sql = """SELECT id, CONCAT_WS(',',project, script, container, context, hostname, mode) as con FROM perf2.timerlog_dict""" | |
# Note! THE ORDER IS IMPORTANT | |
insert_dict_sql = """INSERT IGNORE INTO perf2.timerlog_dict (project, script, container, context, hostname, mode) | |
VALUES ('%s', '%s', '%s', '%s', '%s', '%s') | |
""" | |
class MySQL: | |
_conn = None | |
host = "127.0.0.1" | |
unix_socket = "/var/run/mysqld/mysqld.sock" | |
username = "root" | |
password = "" | |
port = 3306 | |
def __init__(self, init_connect=None): | |
self.init_connect = init_connect | |
""" Basic MySQL connection functionality """ | |
def reconnect(self): | |
self._conn = None | |
while self._conn == None: | |
try: | |
self._conn = _mysql.connect( | |
unix_socket=self.unix_socket, | |
user=self.username, | |
passwd=self.password, | |
init_command="SET SESSION wait_timeout=5", | |
client_flag=CL.MULTI_STATEMENTS | CL.MULTI_RESULTS | |
) | |
if self.init_connect: | |
self.q(self.init_connect) | |
except MemoryError: | |
print sys.exc_info() | |
sys.exit(1) | |
except MySQLdb.Error: | |
print sys.exc_info() | |
time.sleep(1) | |
def q(self, query, use_result=True): | |
# Establish connection if not existing or fails to ping | |
if not self._conn: | |
self.reconnect() | |
# We will retry just once - reconnect has infinite loop though | |
for attempt in (True, False): | |
try: | |
self._conn.query(query) | |
break # if successful | |
except MySQLdb.OperationalError: | |
if attempt: | |
self.reconnect() | |
continue | |
else: | |
return None | |
if use_result: | |
ret = [] | |
while True: | |
r = self._conn.store_result() | |
if not r: | |
# Certain statement sequences will | |
# produce multiple NULL resultsets, | |
# so we have to handle these properly | |
# ^^ - this is not a Haiku | |
if self._conn.next_result() < 0: | |
break | |
else: | |
continue | |
while True: | |
row = r.fetch_row(how=1) | |
if row: | |
ret.append(row[0]) | |
else: | |
break | |
return ret | |
else: | |
return | |
def itemgetter(keys, rowtype=tuple): | |
""" Custom itemgetter for multiple keys """ | |
def getitem(value): | |
return rowtype(value[key] for key in keys) | |
return getitem | |
def group_results(table, key, value): | |
""" Grouping by list of keys summing by list of values """ | |
get_key = itemgetter(key) | |
get_value = itemgetter(value) | |
grouped = {} | |
for row in table: | |
key = get_key(row) | |
value = get_value(row) | |
if key in grouped: | |
grouped[key] = tuple(a + b for a, b in zip(grouped[key], value)) | |
else: | |
grouped[key] = value | |
return [k + v for k, v in sorted(grouped.iteritems())] | |
def fetch_test(): | |
tuples = [ | |
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read2', 'socdwar10', 1,2,3,4,5), | |
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read', 'socdwar10', 1,2,3,4,5), | |
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read', 'socdwar10', 1,2,3,4,5), | |
(1340373840, 'socdwar.ru', '/private/fsfeedback.php', None, 'db', 'read', 'socdwar15', 1,2,3,4,5), | |
(1340373840, 'socdwar.ru', '/private/fsfeedback.php', None, 'db', 'read', 'socdwar7', 1,2,3,4,5), | |
] | |
return tuples | |
#@profile | |
def fetch_rows(): | |
"""Returns a dict consist of logs and dictionary (project, script, container, context, hostname, mode).""" | |
item = { | |
'logs': [], | |
'dict': [] | |
} | |
rows = MySQL().q(select_sql) | |
for row in rows: | |
_con_values = [(s or '-') for s in row['value'].split(',')] | |
_values = "%s,%s,%s" % ((row['hostname'] or '-'), (row['script'] or '-'), ','.join(_con_values)) | |
tup_dict = tuple(_values.split(',')) | |
if tup_dict not in item['dict']: | |
item['dict'].append(tup_dict) | |
tup_log = (int(row['ts']), _values, int(row['sum']), int(row['cnt']), int(row['min']), int(row['max']), int(row['avg'])) | |
item['logs'].append(tup_log) | |
return item | |
def insert_rows(rows, dict): | |
"""Inserts a list of tuples.""" | |
for row in rows: | |
values = list(row) | |
values[1] = int(dict[values[1]]) | |
MySQL().q(insert_sql % tuple(values), False) | |
def print_rows(rows): | |
for t in rows: | |
#print "(%s, %s, %s, %s, %s, %s, %s, %s)," % (ts, hostname, script, context, container, mode, host, total) | |
print t | |
def reload_dict(rows): | |
"""Insert a dictionary and return frash copy of it.""" | |
for row in rows: | |
#print insert_dict_sql % row | |
MySQL().q(insert_dict_sql % row, False) | |
items = MySQL().q(select_dict_sql) | |
log_dict = {} | |
for item in items: | |
log_dict[item['con']] = item['id'] | |
return log_dict | |
def print_usage(): | |
print """usage: ./%s cron | |
insert rows from last 5 min | |
""" % sys.argv[0] | |
sys.exit(1) | |
def main(): | |
args = sys.argv[1:] | |
if not args: | |
print_usage() | |
if args[0] != 'cron': | |
print_usage() | |
# Fetch raw data | |
rows = fetch_rows () | |
print "Rows log:", len(rows['logs']) | |
print "Rows dict", len(rows['dict']) | |
# Update dictionary (project, script, container, context, hostname, mode) | |
rows['dict'] = reload_dict(rows['dict']) | |
# Group by keys summing values | |
grouped = list(group_results(rows['logs'], [0,1], [2,3,4,5,6])) | |
print "Rows:", len(grouped) | |
# Insert grouped rows using dictionary | |
insert_rows (grouped, rows['dict']) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment