Created
December 12, 2015 01:50
-
-
Save jbylund/305a9024ce44b14a9218 to your computer and use it in GitHub Desktop.
Hash Partition some number of files... to be patched up some.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import argparse | |
import hashlib | |
import json | |
import multiprocessing | |
import os | |
import re | |
import resource | |
import subprocess | |
import urllib | |
def claim_limit(): | |
_, hard = resource.getrlimit(resource.RLIMIT_NOFILE) | |
resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) | |
return hard | |
class Partitioner(object): | |
""" | |
should accept an array of regexes, and partition based on those | |
use same number of workers as cores on box | |
each worker gets a number of files equal to nofile_max * some_safety_fraction / num_workers | |
split a file into approx 10 x num_workers | |
""" | |
def __init__(self, hash_patterns=None, sort_patterns=None, skip_patterns=None, | |
prefix='split', outdir='parsed', worker_id=None, naming_function=None): | |
assert hash_patterns is not None | |
self.naming_function = naming_function | |
self.prefix = prefix | |
self.outdir = outdir | |
safety_margin = 0.5 | |
self.num_files = int(claim_limit() * safety_margin / multiprocessing.cpu_count()) | |
if worker_id is None: | |
self.worker_id = os.getpid() # might want to pass this in | |
else: | |
self.worker_id = worker_id | |
self.files = list() | |
maxlen = len(str(self.num_files)) | |
format_pattern = "{{:0{}}}".format(maxlen) | |
for i in xrange(self.num_files): | |
ifilepath = os.path.join(prefix, format_pattern.format(i), str(self.worker_id)) | |
ifileprefix, _, _ = ifilepath.rpartition('/') | |
if not os.path.isdir(ifileprefix): | |
try: | |
os.makedirs(ifileprefix) | |
except OSError: | |
pass | |
except: | |
raise | |
ifile = open(ifilepath, 'w') | |
self.files.append(ifile) | |
self.regexes = Partitioner.compile_regexes(hash_patterns) | |
self.sort_regexes = Partitioner.compile_regexes(sort_patterns) | |
self.omit_regexes = Partitioner.compile_regexes(skip_patterns) | |
@staticmethod | |
def compile_regexes(pattern_list): | |
"""accept""" | |
if not pattern_list: # deals with possible nones | |
return list() | |
regex_list = list() | |
for pattern in pattern_list: | |
regex_list.append(re.compile(pattern)) | |
return regex_list | |
@staticmethod | |
def line_matches(line, patterns): | |
res = list() | |
for pat in patterns: | |
match = pat.search(line) | |
if match is None: | |
res.append("NO_KEY") | |
else: | |
res.append(match.groups()[0]) # append the value of the first match | |
return tuple(res) | |
def get_hash_and_keys(self, line): | |
matches = Partitioner.line_matches(line, self.regexes) | |
extra_sort_keys = Partitioner.line_matches(line, self.sort_regexes) | |
hashval = hash(matches) | |
if hashval < 0: | |
hashval *= -1 | |
return (hashval % self.num_files, matches, extra_sort_keys) | |
def should_skip_line(self, line): | |
# skip over everything which matches your bad patterns | |
for pat in self.omit_regexes: | |
if pat.search(line) is not None: | |
return True | |
return False | |
def partition_file(self, filename): | |
with open(filename) as infile: | |
for line in infile: | |
if self.should_skip_line(line): | |
continue | |
linehash, hashkeys, extra_sort_keys = self.get_hash_and_keys(line) | |
print >> self.files[linehash], "\t".join((json.dumps(hashkeys), | |
json.dumps(extra_sort_keys), | |
line.strip())) | |
def partition_files(self, filenames): | |
for filename in filenames: | |
self.partition_file(filename) | |
def __enter__(self): | |
return self | |
def sort_and_split_files(self): | |
for ifile in self.files: | |
if not os.path.exists(ifile.name): | |
continue | |
sorted_filename = "{}.sorted".format(ifile.name) | |
last_hash_key_string = None | |
last_hash_key_parsed = None | |
outfile = None | |
with open(sorted_filename, 'w') as sorted_file: | |
sort_proc = subprocess.Popen(['sort', ifile.name], stdout=subprocess.PIPE, bufsize=-1) | |
for line in sort_proc.stdout: | |
ihash_keys, isort_keys, original_line = line.split('\t') | |
if ihash_keys != last_hash_key_string: # on key_change | |
last_hash_key_string = ihash_keys | |
last_hash_key_parsed = json.loads(ihash_keys) | |
dirname, filename = self.naming_function(last_hash_key_parsed) | |
full_dirname = os.path.join(self.outdir, dirname) | |
if not os.path.isdir(full_dirname): | |
try: | |
os.makedirs(full_dirname) | |
except OSError: | |
pass # this is file already exists, TODO add errono checking | |
except Exception as oops: | |
print dirname | |
dirname = "WEIRD_THINGS" | |
full_dirname = os.path.join(self.outdir, dirname) | |
os.makedirs(full_dirname) | |
if outfile is not None: | |
outfile.close() # close the open one we had | |
try: | |
outfile = open(os.path.join(full_dirname, filename), 'w') | |
except: | |
filename = 'WEIRD_THINGS' | |
outfile = open(os.path.join(full_dirname, filename), 'w') | |
outfile.write('\t'.join((isort_keys, original_line))) | |
def clean_up_empty_files(self): | |
for ifile in self.files: | |
ifile.close() | |
# remove file if empty..., could have just made them lazily in the first place... | |
if 0 == os.stat(ifile.name).st_size: | |
os.remove(ifile.name) | |
def __exit__(self, a, b, c): | |
if a: | |
raise a | |
self.clean_up_empty_files() # remove any empty files | |
self.sort_and_split_files() # sort each file | |
def remove_empty_dirs(prefix=None): | |
assert prefix is not None | |
for root, dirs, files in os.walk(prefix): | |
if not files and not dirs: | |
os.removedirs(root) | |
def main(): | |
# split into different files based on these patterns | |
patterns = list() | |
patterns.append("[&?]i=([^&]*)&") | |
patterns.append("&d=([^&]*)&") # | |
# sort within files by these patterns | |
sort_keys=["&t=([^&]*)&"] | |
# don't emit lines matching these patterns | |
skip_keys=[' "GET /empty_flash\?tracer='] # , ' "GET /empty_flash '] # it's probably easier to just end up with NO_KEY folders than regex twice... | |
def naming_function(hash_keys): | |
i_key, d_key = hash_keys | |
d_key = urllib.unquote_plus(d_key) | |
try: | |
str(d_key) | |
except Exception as oops: | |
d_key = 'weird_characters' | |
if len(d_key.encode('utf-8')) + len(i_key) > 200: | |
try: | |
d_key = hashlib.md5(d_key.encode('utf-8')).hexdigest() | |
except Exception as oops: | |
print oops | |
try: | |
print d_key | |
except Exception as oops: | |
d_key = 'did_not_get_one' | |
print oops | |
dirname = os.path.join(i_key, d_key) | |
filename = str(os.getpid()) | |
return dirname, filename | |
with Partitioner(hash_patterns=patterns, | |
sort_patterns=sort_keys, | |
skip_patterns=skip_keys, | |
prefix='split', | |
naming_function=naming_function) as myp: | |
parser = argparse.ArgumentParser() | |
parser.add_argument("infiles", nargs="+") | |
args = vars(parser.parse_args()) | |
myp.partition_files(args['infiles']) | |
# remove_empty_dirs(prefix='split') | |
if "__main__" == __name__: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment