Skip to content

Instantly share code, notes, and snippets.

@samdphillips
Last active August 29, 2015 14:14
Show Gist options
  • Save samdphillips/ffdbafb30da76681ff17 to your computer and use it in GitHub Desktop.
Save samdphillips/ffdbafb30da76681ff17 to your computer and use it in GitHub Desktop.
Random text manipulation
import json
from collections import namedtuple
from datetime import datetime
from log_dumb import *
def from_json():
return map(json.loads)
def time_conv(s):
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
log = (pipe(file('somedata.log')) |
from_json() |
dict_update('time', time_conv) |
copy('time', 'time_bucket') |
dict_update('time_bucket', time_bucket))
def report(tbl):
for t in sorted(tbl):
stats = tbl[t]
count = stats.count
print ('%s %6d %10.2f %6d %6d' %
(t, count, stats.sum / float(count), stats.max, stats.min))
print
report_every = 5000
r_count = report_every
tbl = {}
Stats = namedtuple('Stats', 'sum count max min')
for x in log:
t = x['time_bucket']
# elapsed = x['elapsed']
elapsed = x['size']
stats = tbl.get(t, Stats(0, 0, elapsed, elapsed))
tbl[t] = stats._replace(sum=stats.sum + elapsed, count=stats.count + 1,
max=max(stats.max, elapsed), min=min(stats.min, elapsed))
r_count = r_count - 1
if r_count == 0:
r_count = report_every
report(tbl)
report(tbl)
import re
import urllib
from datetime import datetime
from itertools import groupby, ifilter, imap
from operator import itemgetter
ENTRY_PAT = re.compile(r'^(?P<ip_addr>\S+)\s+\S+\s+\S+\s+'
'\[(?P<time>[^]]+?)\]\s+"(?P<method>\w+)\s+'
'(?P<path>\S+)\s+[^"]+"\s+(?P<status>\d+)\s+'
'(?P<size>\d+)\s+(?P<elapsed>\d+)')
class pipe(object):
def __init__(self, it):
self.it = it
def __or__(self, other):
return other.with_input(self)
def __iter__(self):
return self.it
class pipeline(object):
def __init__(self, pipes=[]):
self.pipes = pipes
def __or__(self, other):
return pipeline(self.pipes + [other])
def with_input(self, pipe):
for p in self.pipes:
pipe = pipe | p
return pipe
class map(object):
def __init__(self, func):
self.func = func
def with_input(self, p):
return pipe(imap(self.func, p.it))
class dict_update(map):
def __init__(self, field, fixup):
self.field = field
self.fixup = fixup
def func(self, d):
if self.field in d:
d_new = d.copy()
d_new[self.field] = self.fixup(d[self.field])
return d_new
return d
def copy(old, new):
def cp(d):
if old in d:
dnew = d.copy()
dnew[new] = d[old]
return dnew
return d
return map(cp)
def fixup_num(*fields):
p = pipeline()
for f in fields:
p = p | dict_update(f, int)
return p
class head(object):
def __init__(self, count):
self.count = count
self.seen = 0
def with_input(self, p):
self.it = p.it
return pipe(self)
def __iter__(self):
return self
def next(self):
if self.seen == self.count:
raise StopIteration
self.seen += 1
return self.it.next()
def time_conv(s):
return datetime.strptime(s, '%d/%m/%Y:%H:%M:%S +0000')
def extract_url_query(d):
url = d['path']
dnew = d.copy()
if '?' in url:
url, query_s = url.split('?', 1)
query = {}
for qv in query_s.split('&'):
var, val = qv.split('=')
query[var] = urllib.unquote(val).split(',')
else:
query = {}
del dnew['path']
dnew['url'] = url
dnew['query'] = query
return dnew
def time_bucket(dt):
return dt.replace(second=0)
def count(it):
c = 0
for x in it:
c += 1
return c
tags = { 4: 'L', 2: 'N', 1: 'O' }
def classify(c):
a = c.isalpha()
d = c.isdigit()
return (int(a) << 2 | int(d) << 1 | int(not (a or d)))
def normalize(s):
new = ''
last = 0
for c in s:
cls = classify(c)
if cls != last:
new += tags[cls]
last = cls
return new
if __name__ == '__main__':
log = (pipe(file('somedata.log', 'r')) |
map(lambda x: ENTRY_PAT.match(x).groupdict()) |
map(extract_url_query) |
fixup_num('size', 'elapsed') |
copy('url', 'url_hash') |
dict_update('url_hash', normalize) |
dict_update('time', time_conv))
# for x in log | dict_update('time', time_bucket) | head(5):
# print x
d = {}
r = {}
t = {}
for ent in log:
h = ent['url_hash']
if h not in r:
r[h] = ent['url']
t[h] = ent['time']
d[h] = d.get(h, 0) + 1
for k,v in d.items():
print '%s %-20s %3d %10d %s' % (t[k], k[:20], len(k), v, r[k])
import os
class _SplitState(object):
def __init__(self, previous_state):
self.split = previous_state.split
self.split.state = self
self.on_enter(previous_state)
def on_enter(self, previous_state):
pass
def on_exit(self):
pass
def change_state(self, state_cls):
self.on_exit()
return state_cls(self)
def read_buffer(self, start, size):
self.file.seek(start)
self.buffer = self.file.read(size)
self.pos = 0
def __repr__(self):
return self.__class__.__name__
class INIT(_SplitState):
__slots__ = ['file_name', 'buffer', 'file', 'offset', 'size']
def on_enter(self, state):
self.file_name = state.file_name
self.offset = state.offset
self.size = state.size
def open_file(self):
self.file = open(self.file_name, 'r')
def find_previous_line(self):
if self.offset == 0:
return
j = self.offset - 1024
self.read_buffer(j, 1024)
i = self.buffer.rfind('\n')
o = j + i + 1
self.pos = i + 1
self.size = self.size + (self.offset - o)
self.offset = o
def readline(self):
self.open_file()
self.find_previous_line()
return self.change_state(READ).readline()
class READ(_SplitState):
def on_enter(self, state):
self.buffer = state.buffer
self.offset = state.offset
self.pos = state.pos
self.size = state.size
self.file = state.file
self.size_read = 0
def readline(self):
prefix = []
size = 0
while True:
i = self.buffer.find('\n', self.pos)
if i != -1:
break
size += 1024 - self.pos
if size + self.size_read > self.size:
return self.change_state(DONE).readline()
prefix.append(self.buffer[self.pos:])
self.read_buffer(self.offset + 1024 - self.pos, 1024)
if size + self.size_read + i > self.size:
return self.change_state(DONE).readline()
line = ''.join(prefix) + self.buffer[self.pos:i+1]
self.size_read += size + i + 1
# print prefix
# print `self.buffer[i-10:i+10]`
self.pos = i + 1
return line
def on_exit(self):
self.file.close()
def __repr__(self):
return '<{0} pos={1.pos}>'.format(
self.__class__.__name__, self)
class DONE(_SplitState):
def readline(self):
return ''
class Split(object):
def __init__(self, file_name, offset, size):
# dummy state
preinit = type('', (object,), {})
preinit.split = self
preinit.file_name = file_name
preinit.offset = offset
preinit.size = size
self.state = INIT(preinit)
def __repr__(self):
return '<{0} state={1}>'.format(
self.__class__.__name__,
self.state)
def readline(self):
# print self
return self.state.readline()
DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024
def adjust_chunk_size(sz_chunks, file_size):
while True:
last_chunk_sz = file_size % sz_chunks
if last_chunk_sz > sz_chunks / 2:
return sz_chunks
sz_chunks = sz_chunks - 1
def size_chunker(sz_chunks, file_size):
sz_chunks = adjust_chunk_size(sz_chunks, file_size)
offset = 0
while offset < file_size:
n_offset = offset + sz_chunks
if n_offset > file_size:
size = file_size - offset
else:
size = sz_chunks
yield offset, size
offset += size
def count_chunker(nr_chunks, file_size):
return size_chunker(file_size / nr_chunks, file_size)
def Splitter(file_name, sz_chunks=None, nr_chunks=None):
if sz_chunks is None and nr_chunks is None:
sz_chunks = DEFAULT_CHUNK_SIZE
if sz_chunks is not None and nr_chunks is not None:
raise Exception('cannot specify both sz_chunks and nr_chunks')
file_size = os.stat(file_name).st_size
if sz_chunks is None:
chunker = count_chunker(nr_chunks, file_size)
else:
chunker = size_chunker(sz_chunks, file_size)
for offset,size in chunker:
yield Split(file_name, offset, size)
def readit(split):
return split.readline()
if __name__ == '__main__':
# for f in Splitter('bad_time.log', nr_chunks=15):
# print f
# split = Split(file_name='bad_time.log', offset=3204065412,size=10470802)
# print split
# print `split.readline()`
# print `split.readline()`
# print `split.readline()`
# print split
from multiprocessing.pool import Pool
pool = Pool()
print pool.map(readit, Splitter('bad_time.log'))
pool.join()
log = (pipe(file('bad_time.log', 'r')) |
map(lambda x: ENTRY_PAT.match(x).groupdict()) |
map(extract_url_query) |
fixup_num('size', 'elapsed') |
copy('url', 'url_hash') |
dict_update('url_hash', normalize) |
dict_update('time', time_conv))
# for x in log | dict_update('time', time_bucket) | head(5):
# print x
import json
class DTEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return str(o)
return super(DTEncoder, self).default(o)
start = None
end = None
for ev in log:
if start is None:
d = ev['time']
start = d.replace(hour=4, minute=0, second=0)
end = d.replace(hour=4, minute=20, second=0)
if ev['time'] < end and ev['time'] >= start:
json.dump(ev, sys.stdout, cls=DTEncoder)
print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment