Skip to content

Instantly share code, notes, and snippets.

@jbylund
Created December 12, 2015 01:50
Show Gist options
  • Save jbylund/305a9024ce44b14a9218 to your computer and use it in GitHub Desktop.
Save jbylund/305a9024ce44b14a9218 to your computer and use it in GitHub Desktop.
Hash Partition some number of files... to be patched up some.
#!/usr/bin/python
import argparse
import hashlib
import json
import multiprocessing
import os
import re
import resource
import subprocess
import urllib
def claim_limit():
_, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
return hard
class Partitioner(object):
"""
should accept an array of regexes, and partition based on those
use same number of workers as cores on box
each worker gets a number of files equal to nofile_max * some_safety_fraction / num_workers
split a file into approx 10 x num_workers
"""
def __init__(self, hash_patterns=None, sort_patterns=None, skip_patterns=None,
prefix='split', outdir='parsed', worker_id=None, naming_function=None):
assert hash_patterns is not None
self.naming_function = naming_function
self.prefix = prefix
self.outdir = outdir
safety_margin = 0.5
self.num_files = int(claim_limit() * safety_margin / multiprocessing.cpu_count())
if worker_id is None:
self.worker_id = os.getpid() # might want to pass this in
else:
self.worker_id = worker_id
self.files = list()
maxlen = len(str(self.num_files))
format_pattern = "{{:0{}}}".format(maxlen)
for i in xrange(self.num_files):
ifilepath = os.path.join(prefix, format_pattern.format(i), str(self.worker_id))
ifileprefix, _, _ = ifilepath.rpartition('/')
if not os.path.isdir(ifileprefix):
try:
os.makedirs(ifileprefix)
except OSError:
pass
except:
raise
ifile = open(ifilepath, 'w')
self.files.append(ifile)
self.regexes = Partitioner.compile_regexes(hash_patterns)
self.sort_regexes = Partitioner.compile_regexes(sort_patterns)
self.omit_regexes = Partitioner.compile_regexes(skip_patterns)
@staticmethod
def compile_regexes(pattern_list):
"""accept"""
if not pattern_list: # deals with possible nones
return list()
regex_list = list()
for pattern in pattern_list:
regex_list.append(re.compile(pattern))
return regex_list
@staticmethod
def line_matches(line, patterns):
res = list()
for pat in patterns:
match = pat.search(line)
if match is None:
res.append("NO_KEY")
else:
res.append(match.groups()[0]) # append the value of the first match
return tuple(res)
def get_hash_and_keys(self, line):
matches = Partitioner.line_matches(line, self.regexes)
extra_sort_keys = Partitioner.line_matches(line, self.sort_regexes)
hashval = hash(matches)
if hashval < 0:
hashval *= -1
return (hashval % self.num_files, matches, extra_sort_keys)
def should_skip_line(self, line):
# skip over everything which matches your bad patterns
for pat in self.omit_regexes:
if pat.search(line) is not None:
return True
return False
def partition_file(self, filename):
with open(filename) as infile:
for line in infile:
if self.should_skip_line(line):
continue
linehash, hashkeys, extra_sort_keys = self.get_hash_and_keys(line)
print >> self.files[linehash], "\t".join((json.dumps(hashkeys),
json.dumps(extra_sort_keys),
line.strip()))
def partition_files(self, filenames):
for filename in filenames:
self.partition_file(filename)
def __enter__(self):
return self
def sort_and_split_files(self):
for ifile in self.files:
if not os.path.exists(ifile.name):
continue
sorted_filename = "{}.sorted".format(ifile.name)
last_hash_key_string = None
last_hash_key_parsed = None
outfile = None
with open(sorted_filename, 'w') as sorted_file:
sort_proc = subprocess.Popen(['sort', ifile.name], stdout=subprocess.PIPE, bufsize=-1)
for line in sort_proc.stdout:
ihash_keys, isort_keys, original_line = line.split('\t')
if ihash_keys != last_hash_key_string: # on key_change
last_hash_key_string = ihash_keys
last_hash_key_parsed = json.loads(ihash_keys)
dirname, filename = self.naming_function(last_hash_key_parsed)
full_dirname = os.path.join(self.outdir, dirname)
if not os.path.isdir(full_dirname):
try:
os.makedirs(full_dirname)
except OSError:
pass # this is file already exists, TODO add errono checking
except Exception as oops:
print dirname
dirname = "WEIRD_THINGS"
full_dirname = os.path.join(self.outdir, dirname)
os.makedirs(full_dirname)
if outfile is not None:
outfile.close() # close the open one we had
try:
outfile = open(os.path.join(full_dirname, filename), 'w')
except:
filename = 'WEIRD_THINGS'
outfile = open(os.path.join(full_dirname, filename), 'w')
outfile.write('\t'.join((isort_keys, original_line)))
def clean_up_empty_files(self):
for ifile in self.files:
ifile.close()
# remove file if empty..., could have just made them lazily in the first place...
if 0 == os.stat(ifile.name).st_size:
os.remove(ifile.name)
def __exit__(self, a, b, c):
if a:
raise a
self.clean_up_empty_files() # remove any empty files
self.sort_and_split_files() # sort each file
def remove_empty_dirs(prefix=None):
assert prefix is not None
for root, dirs, files in os.walk(prefix):
if not files and not dirs:
os.removedirs(root)
def main():
# split into different files based on these patterns
patterns = list()
patterns.append("[&?]i=([^&]*)&")
patterns.append("&d=([^&]*)&") #
# sort within files by these patterns
sort_keys=["&t=([^&]*)&"]
# don't emit lines matching these patterns
skip_keys=[' "GET /empty_flash\?tracer='] # , ' "GET /empty_flash '] # it's probably easier to just end up with NO_KEY folders than regex twice...
def naming_function(hash_keys):
i_key, d_key = hash_keys
d_key = urllib.unquote_plus(d_key)
try:
str(d_key)
except Exception as oops:
d_key = 'weird_characters'
if len(d_key.encode('utf-8')) + len(i_key) > 200:
try:
d_key = hashlib.md5(d_key.encode('utf-8')).hexdigest()
except Exception as oops:
print oops
try:
print d_key
except Exception as oops:
d_key = 'did_not_get_one'
print oops
dirname = os.path.join(i_key, d_key)
filename = str(os.getpid())
return dirname, filename
with Partitioner(hash_patterns=patterns,
sort_patterns=sort_keys,
skip_patterns=skip_keys,
prefix='split',
naming_function=naming_function) as myp:
parser = argparse.ArgumentParser()
parser.add_argument("infiles", nargs="+")
args = vars(parser.parse_args())
myp.partition_files(args['infiles'])
# remove_empty_dirs(prefix='split')
if "__main__" == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment