Skip to content

Instantly share code, notes, and snippets.

@bondario
Created August 1, 2012 08:56
Show Gist options
  • Save bondario/3225181 to your computer and use it in GitHub Desktop.
Save bondario/3225181 to your computer and use it in GitHub Desktop.
Group by list of keys sum by list of values
#!/usr/bin/env python
# encoding: utf-8
"""
perf.py - Aggregates rows to minute interval
by fieds: project, script, context, container, mode, host
summing fields: sum, cnt, min, max, avg
NOTE! ONLY cron usage with 5 min interval.
Created by Denis Bondar.
"""
import sys, re
import time
import datetime
import _mysql
import MySQLdb
import MySQLdb.constants.CLIENT as CL
import json
from itertools import groupby, imap
from pprint import pprint as pp
Error = MySQLdb.DatabaseError
'''
USE perf2;
DROP TABLE `timerlog`;
DROP TABLE `timerlog_dict`;
CREATE TABLE `timerlog_dict` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`project` char(64) DEFAULT NULL,
`script` char(64) DEFAULT NULL,
`context` char(64) DEFAULT NULL,
`container` char(64) DEFAULT NULL,
`mode` char(64) DEFAULT NULL,
`hostname` char(64) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mult_key` (`project`,`script`,`context`,`container`,`mode`,`hostname`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `timerlog` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`dict_id` int(10) unsigned NOT NULL,
`ts` int(10) unsigned NOT NULL DEFAULT '0',
`sum` int(10) unsigned NOT NULL DEFAULT '0',
`cnt` int(10) unsigned NOT NULL DEFAULT '0',
`min` int(10) unsigned NOT NULL DEFAULT '0',
`max` int(10) unsigned NOT NULL DEFAULT '0',
`avg` int(10) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
FOREIGN KEY (dict_id) REFERENCES timerlog_dict(id) ON DELETE CASCADE,
KEY `dict_ts` (`dict_id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
'''
""" Main SQL to fetch raw rows """
select_sql = """SELECT t.id as id,
(ROUND(t.ts/60)*60) as ts,
t.sum as sum,
t.cnt as cnt,
t.min as min,
t.max as max,
t.avg as avg,
NULLIF(hs.hostname,'-') as hostname,
NULLIF(hs.script,'-') as script,
group_concat(tv.tag order by tv.tag separator ',') as tag,
group_concat(tv.value order by tv.tag separator ',') as value
FROM perf.timers t, perf.hostscript hs, perf.timertags tg, perf.tag_values tv
WHERE t.ts >=UNIX_TIMESTAMP(DATE_FORMAT(NOW() - INTERVAL 5 MINUTE, '%Y-%m-%d %H:%i')) AND t.ts<UNIX_TIMESTAMP(DATE_FORMAT(NOW(), '%Y-%m-%d %H:%i'))
AND t.hsid=hs.id
AND t.id=tg.timer_id
AND tg.tag_value_id=tv.id
GROUP BY t.id
"""
insert_sql = """INSERT INTO perf2.timerlog (ts, dict_id, sum, cnt, min, max, avg)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
# Note! THE ORDER IS IMPORTANT
select_dict_sql = """SELECT id, CONCAT_WS(',',project, script, container, context, hostname, mode) as con FROM perf2.timerlog_dict"""
# Note! THE ORDER IS IMPORTANT
insert_dict_sql = """INSERT IGNORE INTO perf2.timerlog_dict (project, script, container, context, hostname, mode)
VALUES ('%s', '%s', '%s', '%s', '%s', '%s')
"""
class MySQL:
_conn = None
host = "127.0.0.1"
unix_socket = "/var/run/mysqld/mysqld.sock"
username = "root"
password = ""
port = 3306
def __init__(self, init_connect=None):
self.init_connect = init_connect
""" Basic MySQL connection functionality """
def reconnect(self):
self._conn = None
while self._conn == None:
try:
self._conn = _mysql.connect(
unix_socket=self.unix_socket,
user=self.username,
passwd=self.password,
init_command="SET SESSION wait_timeout=5",
client_flag=CL.MULTI_STATEMENTS | CL.MULTI_RESULTS
)
if self.init_connect:
self.q(self.init_connect)
except MemoryError:
print sys.exc_info()
sys.exit(1)
except MySQLdb.Error:
print sys.exc_info()
time.sleep(1)
def q(self, query, use_result=True):
# Establish connection if not existing or fails to ping
if not self._conn:
self.reconnect()
# We will retry just once - reconnect has infinite loop though
for attempt in (True, False):
try:
self._conn.query(query)
break # if successful
except MySQLdb.OperationalError:
if attempt:
self.reconnect()
continue
else:
return None
if use_result:
ret = []
while True:
r = self._conn.store_result()
if not r:
# Certain statement sequences will
# produce multiple NULL resultsets,
# so we have to handle these properly
# ^^ - this is not a Haiku
if self._conn.next_result() < 0:
break
else:
continue
while True:
row = r.fetch_row(how=1)
if row:
ret.append(row[0])
else:
break
return ret
else:
return
def itemgetter(keys, rowtype=tuple):
""" Custom itemgetter for multiple keys """
def getitem(value):
return rowtype(value[key] for key in keys)
return getitem
def group_results(table, key, value):
""" Grouping by list of keys summing by list of values """
get_key = itemgetter(key)
get_value = itemgetter(value)
grouped = {}
for row in table:
key = get_key(row)
value = get_value(row)
if key in grouped:
grouped[key] = tuple(a + b for a, b in zip(grouped[key], value))
else:
grouped[key] = value
return [k + v for k, v in sorted(grouped.iteritems())]
def fetch_test():
tuples = [
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read2', 'socdwar10', 1,2,3,4,5),
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read', 'socdwar10', 1,2,3,4,5),
(1340373840, 'socdwar.ru', '/entry_point.php', 'user|tutorial_step', 'node', 'read', 'socdwar10', 1,2,3,4,5),
(1340373840, 'socdwar.ru', '/private/fsfeedback.php', None, 'db', 'read', 'socdwar15', 1,2,3,4,5),
(1340373840, 'socdwar.ru', '/private/fsfeedback.php', None, 'db', 'read', 'socdwar7', 1,2,3,4,5),
]
return tuples
#@profile
def fetch_rows():
"""Returns a dict consist of logs and dictionary (project, script, container, context, hostname, mode)."""
item = {
'logs': [],
'dict': []
}
rows = MySQL().q(select_sql)
for row in rows:
_con_values = [(s or '-') for s in row['value'].split(',')]
_values = "%s,%s,%s" % ((row['hostname'] or '-'), (row['script'] or '-'), ','.join(_con_values))
tup_dict = tuple(_values.split(','))
if tup_dict not in item['dict']:
item['dict'].append(tup_dict)
tup_log = (int(row['ts']), _values, int(row['sum']), int(row['cnt']), int(row['min']), int(row['max']), int(row['avg']))
item['logs'].append(tup_log)
return item
def insert_rows(rows, dict):
"""Inserts a list of tuples."""
for row in rows:
values = list(row)
values[1] = int(dict[values[1]])
MySQL().q(insert_sql % tuple(values), False)
def print_rows(rows):
for t in rows:
#print "(%s, %s, %s, %s, %s, %s, %s, %s)," % (ts, hostname, script, context, container, mode, host, total)
print t
def reload_dict(rows):
"""Insert a dictionary and return frash copy of it."""
for row in rows:
#print insert_dict_sql % row
MySQL().q(insert_dict_sql % row, False)
items = MySQL().q(select_dict_sql)
log_dict = {}
for item in items:
log_dict[item['con']] = item['id']
return log_dict
def print_usage():
print """usage: ./%s cron
insert rows from last 5 min
""" % sys.argv[0]
sys.exit(1)
def main():
args = sys.argv[1:]
if not args:
print_usage()
if args[0] != 'cron':
print_usage()
# Fetch raw data
rows = fetch_rows ()
print "Rows log:", len(rows['logs'])
print "Rows dict", len(rows['dict'])
# Update dictionary (project, script, container, context, hostname, mode)
rows['dict'] = reload_dict(rows['dict'])
# Group by keys summing values
grouped = list(group_results(rows['logs'], [0,1], [2,3,4,5,6]))
print "Rows:", len(grouped)
# Insert grouped rows using dictionary
insert_rows (grouped, rows['dict'])
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment