Skip to content

Instantly share code, notes, and snippets.

@ketralnis
Created January 3, 2011 19:58
Show Gist options
  • Save ketralnis/763863 to your computer and use it in GitHub Desktop.
Save ketralnis/763863 to your computer and use it in GitHub Desktop.
simple parallel sort
#!/usr/bin/env python
import os
import re
import sys
import tempfile
import multiprocessing
from optparse import OptionParser
from subprocess import Popen, PIPE, STDOUT
debug = False
def status(s, **kw):
if debug:
if kw:
s = s % kw
sys.stderr.write('%s\n' % (s,))
buffer_size_re = re.compile('^([0-9.]+)([kmg]?)?b?$')
def parse_size_str(s):
# try to parse the buffer size
buffer_match = buffer_size_re.match(s)
if not buffer_match:
raise Exception("Can't parse %r" % options.buffer_size)
buffer_num_units = buffer_match.group(1)
buffer_units = buffer_match.group(2)
if buffer_units in ('b', ''):
buffer_size = int(buffer_num_units)
else:
if '.' in buffer_num_units:
buffer_size = float(buffer_num_units)
else:
buffer_size = long(buffer_num_units)
if buffer_units == 'k':
buffer_size *= 1024
elif buffer_units == 'm':
buffer_size *= 1024*1024
elif buffer_units == 'g':
buffer_size *= 1024*1024*1024
else:
raise Exception("Unknown unit %r" % buffer_units)
if '.' in buffer_num_units:
buffer_size = long(buffer_size)
return buffer_size
class PendingWset(object):
def __init__(self, sort_binary, num_procs = 1, sort_args = []):
self.wsets = {}
self.finished_wsets = []
self.num_procs = num_procs
self.sort_args = sort_args
self.sort_binary = sort_binary
def append(self, wset):
if len(self.wsets) >= self.num_procs:
# we've launched the max number of processes, so first
# check for those that have already finished to remove
# them
for w in list(self.wsets.values()):
ret = w.poll()
if ret is None:
pass
elif ret == 0:
self.wsets.pop(w.pid)
self.finished_wsets.append(w)
else:
raise Exception("Sorter completed with an error")
while len(self.wsets) >= self.num_procs:
# we're already running the max number and need to wait
# for one to finish before starting another.
status("Waiting early for sort process (%d/%d)"
% (len(self.wsets), self.num_procs))
pid, estatus = os.wait()
if estatus != 0:
raise Exception("Sort completed with an error")
assert pid in self.wsets
completed_wset = self.wsets.pop(pid)
self.finished_wsets.append(completed_wset)
status("Beginning sort on %r of %d bytes" % (wset, wset.size))
wset.begin_sort()
self.wsets[wset.pid] = wset
def dump_sorted(self):
for wset in self.wsets.values():
status('Waiting for %r' % self)
wset.wait()
self.finished_wsets.append(wset)
self.wsets = {}
cmd = ([self.sort_binary]
+ self.sort_args
+ ['-m']
+ [wset.tf.name
for wset
in self.finished_wsets])
status('Starting %r' % cmd)
# will spit to stdout on its own
proc = Popen(cmd)
proc.wait()
for wset in self.finished_wsets:
wset.cleanup()
class Wset(object):
def __init__(self, sort_binary, sort_args = [], temp_dir = None):
self.tf = tempfile.NamedTemporaryFile(mode='w', dir=temp_dir)
cmd = [sort_binary] + sort_args
status("Starting %r to %r" % (cmd, self.tf.name))
self.proc = Popen(cmd, stdin = PIPE, stdout=self.tf)
self.size = 0
status('Starting %r' % self)
def __repr__(self):
return "<Wset(%d, %r)>" % (self.size, id(self))
@property
def pid(self):
return self.proc.pid
@property
def returncode(self):
return self.proc.returncode
def append(self, line):
self.proc.stdin.write(line)
self.size += len(line)
def begin_sort(self):
status('Starting sort on %r' % self)
self.proc.stdin.close()
return self.proc.pid
def wait(self):
status('Waiting on %r' % self)
return self.proc.wait()
def poll(self):
return self.proc.poll()
def cleanup(self):
self.tf.close()
def main():
global debug
# otherwise 'sort' wants to respect our locale and that's just
# sille
os.environ['LANG'] = 'C'
parser = OptionParser()
parser.add_option('-N', '--num-procs', dest='num_procs',
default = multiprocessing.cpu_count(), type='int')
parser.add_option('--debug', dest='debug',
action='store_true', default=False)
parser.add_option('--binary', dest='sort_binary',
default='/usr/bin/sort',
help="path to the 'sort' binary. Must accept arguments -T, -S, -m")
parser.add_option('-S', '--buffer-size', dest='buffer_size',
default='10m', help="the buffer size to be used by each sort process")
parser.add_option('-T', '--temporary-directory', dest='temp_dir', default=None)
parser.add_option('-n', dest='numeric', action='store_true', default=False)
parser.add_option('-r', dest='reverse', action='store_true', default=False)
parser.add_option('-k', dest='sort_key', default=[], action='append')
options, rest = parser.parse_args()
debug = options.debug
sort_binary = options.sort_binary
if rest:
files = [map(open, rest)]
else:
files = [sys.stdin]
sort_args = []
buffer_size = parse_size_str(options.buffer_size)
# extra kilobyte for the remainder of the line that we probably
# read in the middle of
sort_args.append('-S%db' % (buffer_size+1024))
if options.temp_dir:
sort_args.append('-T%s' % options.temp_dir)
if options.numeric:
sort_args.append('-n')
if options.reverse:
sort_args.append('-r')
for key in options.sort_key:
sort_args.append('-k ' + key)
full_wsets = PendingWset(sort_binary=sort_binary,
num_procs=options.num_procs,
sort_args=sort_args)
for f in files:
while True:
b = f.read(buffer_size)
if not b:
break
wset = Wset(sort_binary=sort_binary,
sort_args=sort_args,
temp_dir = options.temp_dir)
wset.append(b)
if b[-1] != '\n':
wset.append(f.readline())
full_wsets.append(wset)
full_wsets.dump_sorted()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment