Skip to content

Instantly share code, notes, and snippets.

@ikostia
Created December 7, 2014 00:14
Embed
What would you like to do?
#! /usr/bin/python
import os, sys
import heapq
class ExternalSorter(object):
CHUNK_SIZE = 10*1000*1000
CHUNK_FILE_NAME = "chunks/chunk.%.5i.txt"
def __init__(self, fname):
self.fname = fname
self.fsize = os.path.getsize(self.fname)
self.fobj = None
self.chunk_num = 0
self.sort_initial_chunks()
self.k_way_merge()
def read_next_element(self, fobj):
line = fobj.readline()
return int(line) if line else None
def process_chunk(self, chunk, chunk_len, ind):
if chunk_len < len(chunk):
chunk[:chunk_len] = sorted(chunk[:chunk_len])
else:
chunk.sort()
fname = self.CHUNK_FILE_NAME % ind
self.chunkfiles.append(fname)
with open(fname, 'w') as chunkfile:
for i in xrange(chunk_len):
chunkfile.write(str(chunk[i]) + '\n')
def sort_initial_chunks(self):
self.chunkfiles = []
chunk = [0 for i in xrange(self.CHUNK_SIZE)]
with open(self.fname, 'r') as self.fobj:
done = False
self.chunk_num = 0
i = 0
while not done:
if i >= self.CHUNK_SIZE:
self.process_chunk(chunk, i, self.chunk_num)
self.chunk_num += 1
i = 0
el = self.read_next_element(self.fobj)
if el is None:
if i > 0:
self.process_chunk(chunk, i, self.chunk_num)
self.chunk_num += 1
done = True
break
chunk[i] = el
i = i + 1
def read_from_next_chunk(self, chunkfobjs, ind):
done = False
iteration = 0
while not done:
iteration = iteration + 1
if iteration > len(chunkfobjs):
return None, None
ind = ind % len(chunkfobjs)
if chunkfobjs[ind].closed:
ind = ind + 1
continue
el = self.read_next_element(chunkfobjs[ind])
if el is None:
chunkfobjs[ind].close()
ind = ind + 1
continue
return el, ind
def k_way_merge(self):
resultfobj = open('sorted.' + self.fname, 'w')
heap = []
chunkfobjs = [open(fname, 'r') for fname in self.chunkfiles]
chunkind = 0
while len(heap) <= len(chunkfobjs):
el, chunkind = self.read_from_next_chunk(chunkfobjs, chunkind)
if el is None:
break
heap.append((el, chunkind))
chunkind += 1
heapq.heapify(heap)
while len(heap) > 0:
el, ind = heapq.heappop(heap)
resultfobj.write(str(el) + '\n')
el, ind = self.read_from_next_chunk(chunkfobjs, ind)
if el is not None:
heapq.heappush(heap, (el, ind))
resultfobj.close()
if __name__ == "__main__":
fname = sys.argv[1] if len(sys.argv) > 1 else "data.txt"
sorter = ExternalSorter(fname)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment