Skip to content

Instantly share code, notes, and snippets.

@tshatrov
Created November 20, 2018 09:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tshatrov/3c862eced64b25dd4256c02e985ec1e7 to your computer and use it in GitHub Desktop.
Save tshatrov/3c862eced64b25dd4256c02e985ec1e7 to your computer and use it in GitHub Desktop.
import random
import string
import argparse
CHARSET = string.ascii_lowercase + string.ascii_uppercase + string.digits
def generate_file(f, n_lines, line_length):
for i in xrange(n_lines):
line = "".join(random.choice(CHARSET) for j in xrange(line_length)) + "\n"
f.write(line)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("n_lines", help="Number of lines", type=int)
parser.add_argument("line_length", help="Line length", type=int)
parser.add_argument("filename", help="File name")
args = parser.parse_args()
with open(args.filename, "w") as f:
generate_file(f, args.n_lines, args.line_length)
import os
import heapq
import argparse
import tempfile
# number of lines in each chunk
CHUNK_SIZE = 10000
# number of chunks being merged at once. This is the maximum number of file descriptors that will be open
MERGE_LIMIT = 20
def sort_chunk(f, tmp_dir):
heap = []
for i in xrange(CHUNK_SIZE):
line = f.readline()
if not line:
break
heapq.heappush(heap, line)
if heap:
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tf:
while True:
try:
tf.write(heapq.heappop(heap))
except IndexError:
break
return tf.name
def make_chunks(f, tmp_dir):
chunks = []
while True:
chunk = sort_chunk(f, tmp_dir)
if chunk is not None:
chunks.append(chunk)
else:
break
return chunks
class ChunkMerger(object):
def __init__(self, chunks):
self.chunks = chunks
self.heap = []
def __enter__(self):
self.fp = [open(chunk, "r") for chunk in self.chunks]
for idx, f in enumerate(self.fp):
line = f.readline()
heapq.heappush(self.heap, (line, idx))
return self
def __exit__(self, *args, **kwargs):
for f in self.fp:
f.close()
for chunk in self.chunks:
os.remove(chunk)
def __iter__(self):
return self
def __next__(self):
if not self.heap:
raise StopIteration
line, idx = heapq.heappop(self.heap)
new_line = self.fp[idx].readline()
if new_line:
heapq.heappush(self.heap, (new_line, idx))
return line
def next(self):
return self.__next__()
def merge_chunks(chunks, tmp_dir):
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tf:
with ChunkMerger(chunks) as cm:
for line in cm:
tf.write(line)
return tf.name
def sort_file(fin, fout):
"""
Sort a large file by storing partially-sorted chunks on disk.
First we split a file in chunks of maximum CHUNK_SIZE lines each and sort them using heap sort algorithm.
Then we merge the chunks using a merge sort algorithm, opening at most MERGE_LIMIT files at once.
"""
tmp_dir = os.path.dirname(fout)
with open(fin, "r") as f:
chunks = make_chunks(f, tmp_dir)
if not chunks:
os.copy(fin, fout)
while len(chunks) > 1:
chunks = [
merge_chunks(chunks[i:i+MERGE_LIMIT], tmp_dir)
for i in xrange(0, len(chunks), MERGE_LIMIT)
]
os.rename(chunks[0], fout)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("input", help="Input file name")
parser.add_argument("output", help="Output file name")
args = parser.parse_args()
sort_file(args.input, args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment