Last active
August 29, 2015 14:14
-
-
Save samdphillips/ffdbafb30da76681ff17 to your computer and use it in GitHub Desktop.
Random text manipulation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from collections import namedtuple | |
from datetime import datetime | |
from log_dumb import * | |
def from_json(): | |
return map(json.loads) | |
def time_conv(s): | |
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S') | |
log = (pipe(file('somedata.log')) | | |
from_json() | | |
dict_update('time', time_conv) | | |
copy('time', 'time_bucket') | | |
dict_update('time_bucket', time_bucket)) | |
def report(tbl): | |
for t in sorted(tbl): | |
stats = tbl[t] | |
count = stats.count | |
print ('%s %6d %10.2f %6d %6d' % | |
(t, count, stats.sum / float(count), stats.max, stats.min)) | |
report_every = 5000 | |
r_count = report_every | |
tbl = {} | |
Stats = namedtuple('Stats', 'sum count max min') | |
for x in log: | |
t = x['time_bucket'] | |
# elapsed = x['elapsed'] | |
elapsed = x['size'] | |
stats = tbl.get(t, Stats(0, 0, elapsed, elapsed)) | |
tbl[t] = stats._replace(sum=stats.sum + elapsed, count=stats.count + 1, | |
max=max(stats.max, elapsed), min=min(stats.min, elapsed)) | |
r_count = r_count - 1 | |
if r_count == 0: | |
r_count = report_every | |
report(tbl) | |
report(tbl) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import urllib | |
from datetime import datetime | |
from itertools import groupby, ifilter, imap | |
from operator import itemgetter | |
ENTRY_PAT = re.compile(r'^(?P<ip_addr>\S+)\s+\S+\s+\S+\s+' | |
'\[(?P<time>[^]]+?)\]\s+"(?P<method>\w+)\s+' | |
'(?P<path>\S+)\s+[^"]+"\s+(?P<status>\d+)\s+' | |
'(?P<size>\d+)\s+(?P<elapsed>\d+)') | |
class pipe(object): | |
def __init__(self, it): | |
self.it = it | |
def __or__(self, other): | |
return other.with_input(self) | |
def __iter__(self): | |
return self.it | |
class pipeline(object): | |
def __init__(self, pipes=[]): | |
self.pipes = pipes | |
def __or__(self, other): | |
return pipeline(self.pipes + [other]) | |
def with_input(self, pipe): | |
for p in self.pipes: | |
pipe = pipe | p | |
return pipe | |
class map(object): | |
def __init__(self, func): | |
self.func = func | |
def with_input(self, p): | |
return pipe(imap(self.func, p.it)) | |
class dict_update(map): | |
def __init__(self, field, fixup): | |
self.field = field | |
self.fixup = fixup | |
def func(self, d): | |
if self.field in d: | |
d_new = d.copy() | |
d_new[self.field] = self.fixup(d[self.field]) | |
return d_new | |
return d | |
def copy(old, new): | |
def cp(d): | |
if old in d: | |
dnew = d.copy() | |
dnew[new] = d[old] | |
return dnew | |
return d | |
return map(cp) | |
def fixup_num(*fields): | |
p = pipeline() | |
for f in fields: | |
p = p | dict_update(f, int) | |
return p | |
class head(object): | |
def __init__(self, count): | |
self.count = count | |
self.seen = 0 | |
def with_input(self, p): | |
self.it = p.it | |
return pipe(self) | |
def __iter__(self): | |
return self | |
def next(self): | |
if self.seen == self.count: | |
raise StopIteration | |
self.seen += 1 | |
return self.it.next() | |
def time_conv(s): | |
return datetime.strptime(s, '%d/%m/%Y:%H:%M:%S +0000') | |
def extract_url_query(d): | |
url = d['path'] | |
dnew = d.copy() | |
if '?' in url: | |
url, query_s = url.split('?', 1) | |
query = {} | |
for qv in query_s.split('&'): | |
var, val = qv.split('=') | |
query[var] = urllib.unquote(val).split(',') | |
else: | |
query = {} | |
del dnew['path'] | |
dnew['url'] = url | |
dnew['query'] = query | |
return dnew | |
def time_bucket(dt): | |
return dt.replace(second=0) | |
def count(it): | |
c = 0 | |
for x in it: | |
c += 1 | |
return c | |
tags = { 4: 'L', 2: 'N', 1: 'O' } | |
def classify(c): | |
a = c.isalpha() | |
d = c.isdigit() | |
return (int(a) << 2 | int(d) << 1 | int(not (a or d))) | |
def normalize(s): | |
new = '' | |
last = 0 | |
for c in s: | |
cls = classify(c) | |
if cls != last: | |
new += tags[cls] | |
last = cls | |
return new | |
if __name__ == '__main__': | |
log = (pipe(file('somedata.log', 'r')) | | |
map(lambda x: ENTRY_PAT.match(x).groupdict()) | | |
map(extract_url_query) | | |
fixup_num('size', 'elapsed') | | |
copy('url', 'url_hash') | | |
dict_update('url_hash', normalize) | | |
dict_update('time', time_conv)) | |
# for x in log | dict_update('time', time_bucket) | head(5): | |
# print x | |
d = {} | |
r = {} | |
t = {} | |
for ent in log: | |
h = ent['url_hash'] | |
if h not in r: | |
r[h] = ent['url'] | |
t[h] = ent['time'] | |
d[h] = d.get(h, 0) + 1 | |
for k,v in d.items(): | |
print '%s %-20s %3d %10d %s' % (t[k], k[:20], len(k), v, r[k]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
class _SplitState(object): | |
def __init__(self, previous_state): | |
self.split = previous_state.split | |
self.split.state = self | |
self.on_enter(previous_state) | |
def on_enter(self, previous_state): | |
pass | |
def on_exit(self): | |
pass | |
def change_state(self, state_cls): | |
self.on_exit() | |
return state_cls(self) | |
def read_buffer(self, start, size): | |
self.file.seek(start) | |
self.buffer = self.file.read(size) | |
self.pos = 0 | |
def __repr__(self): | |
return self.__class__.__name__ | |
class INIT(_SplitState): | |
__slots__ = ['file_name', 'buffer', 'file', 'offset', 'size'] | |
def on_enter(self, state): | |
self.file_name = state.file_name | |
self.offset = state.offset | |
self.size = state.size | |
def open_file(self): | |
self.file = open(self.file_name, 'r') | |
def find_previous_line(self): | |
if self.offset == 0: | |
return | |
j = self.offset - 1024 | |
self.read_buffer(j, 1024) | |
i = self.buffer.rfind('\n') | |
o = j + i + 1 | |
self.pos = i + 1 | |
self.size = self.size + (self.offset - o) | |
self.offset = o | |
def readline(self): | |
self.open_file() | |
self.find_previous_line() | |
return self.change_state(READ).readline() | |
class READ(_SplitState): | |
def on_enter(self, state): | |
self.buffer = state.buffer | |
self.offset = state.offset | |
self.pos = state.pos | |
self.size = state.size | |
self.file = state.file | |
self.size_read = 0 | |
def readline(self): | |
prefix = [] | |
size = 0 | |
while True: | |
i = self.buffer.find('\n', self.pos) | |
if i != -1: | |
break | |
size += 1024 - self.pos | |
if size + self.size_read > self.size: | |
return self.change_state(DONE).readline() | |
prefix.append(self.buffer[self.pos:]) | |
self.read_buffer(self.offset + 1024 - self.pos, 1024) | |
if size + self.size_read + i > self.size: | |
return self.change_state(DONE).readline() | |
line = ''.join(prefix) + self.buffer[self.pos:i+1] | |
self.size_read += size + i + 1 | |
# print prefix | |
# print `self.buffer[i-10:i+10]` | |
self.pos = i + 1 | |
return line | |
def on_exit(self): | |
self.file.close() | |
def __repr__(self): | |
return '<{0} pos={1.pos}>'.format( | |
self.__class__.__name__, self) | |
class DONE(_SplitState): | |
def readline(self): | |
return '' | |
class Split(object): | |
def __init__(self, file_name, offset, size): | |
# dummy state | |
preinit = type('', (object,), {}) | |
preinit.split = self | |
preinit.file_name = file_name | |
preinit.offset = offset | |
preinit.size = size | |
self.state = INIT(preinit) | |
def __repr__(self): | |
return '<{0} state={1}>'.format( | |
self.__class__.__name__, | |
self.state) | |
def readline(self): | |
# print self | |
return self.state.readline() | |
DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 | |
def adjust_chunk_size(sz_chunks, file_size): | |
while True: | |
last_chunk_sz = file_size % sz_chunks | |
if last_chunk_sz > sz_chunks / 2: | |
return sz_chunks | |
sz_chunks = sz_chunks - 1 | |
def size_chunker(sz_chunks, file_size): | |
sz_chunks = adjust_chunk_size(sz_chunks, file_size) | |
offset = 0 | |
while offset < file_size: | |
n_offset = offset + sz_chunks | |
if n_offset > file_size: | |
size = file_size - offset | |
else: | |
size = sz_chunks | |
yield offset, size | |
offset += size | |
def count_chunker(nr_chunks, file_size): | |
return size_chunker(file_size / nr_chunks, file_size) | |
def Splitter(file_name, sz_chunks=None, nr_chunks=None): | |
if sz_chunks is None and nr_chunks is None: | |
sz_chunks = DEFAULT_CHUNK_SIZE | |
if sz_chunks is not None and nr_chunks is not None: | |
raise Exception('cannot specify both sz_chunks and nr_chunks') | |
file_size = os.stat(file_name).st_size | |
if sz_chunks is None: | |
chunker = count_chunker(nr_chunks, file_size) | |
else: | |
chunker = size_chunker(sz_chunks, file_size) | |
for offset,size in chunker: | |
yield Split(file_name, offset, size) | |
def readit(split): | |
return split.readline() | |
if __name__ == '__main__': | |
# for f in Splitter('bad_time.log', nr_chunks=15): | |
# print f | |
# split = Split(file_name='bad_time.log', offset=3204065412,size=10470802) | |
# print split | |
# print `split.readline()` | |
# print `split.readline()` | |
# print `split.readline()` | |
# print split | |
from multiprocessing.pool import Pool | |
pool = Pool() | |
print pool.map(readit, Splitter('bad_time.log')) | |
pool.join() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
log = (pipe(file('bad_time.log', 'r')) | | |
map(lambda x: ENTRY_PAT.match(x).groupdict()) | | |
map(extract_url_query) | | |
fixup_num('size', 'elapsed') | | |
copy('url', 'url_hash') | | |
dict_update('url_hash', normalize) | | |
dict_update('time', time_conv)) | |
# for x in log | dict_update('time', time_bucket) | head(5): | |
# print x | |
import json | |
class DTEncoder(json.JSONEncoder): | |
def default(self, o): | |
if isinstance(o, datetime): | |
return str(o) | |
return super(DTEncoder, self).default(o) | |
start = None | |
end = None | |
for ev in log: | |
if start is None: | |
d = ev['time'] | |
start = d.replace(hour=4, minute=0, second=0) | |
end = d.replace(hour=4, minute=20, second=0) | |
if ev['time'] < end and ev['time'] >= start: | |
json.dump(ev, sys.stdout, cls=DTEncoder) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment