-
-
Save tshatrov/3c862eced64b25dd4256c02e985ec1e7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import string | |
import argparse | |
CHARSET = string.ascii_lowercase + string.ascii_uppercase + string.digits | |
def generate_file(f, n_lines, line_length): | |
for i in xrange(n_lines): | |
line = "".join(random.choice(CHARSET) for j in xrange(line_length)) + "\n" | |
f.write(line) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("n_lines", help="Number of lines", type=int) | |
parser.add_argument("line_length", help="Line length", type=int) | |
parser.add_argument("filename", help="File name") | |
args = parser.parse_args() | |
with open(args.filename, "w") as f: | |
generate_file(f, args.n_lines, args.line_length) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import heapq | |
import argparse | |
import tempfile | |
# number of lines in each chunk | |
CHUNK_SIZE = 10000 | |
# number of chunks being merged at once. This is the maximum number of file descriptors that will be open | |
MERGE_LIMIT = 20 | |
def sort_chunk(f, tmp_dir): | |
heap = [] | |
for i in xrange(CHUNK_SIZE): | |
line = f.readline() | |
if not line: | |
break | |
heapq.heappush(heap, line) | |
if heap: | |
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tf: | |
while True: | |
try: | |
tf.write(heapq.heappop(heap)) | |
except IndexError: | |
break | |
return tf.name | |
def make_chunks(f, tmp_dir): | |
chunks = [] | |
while True: | |
chunk = sort_chunk(f, tmp_dir) | |
if chunk is not None: | |
chunks.append(chunk) | |
else: | |
break | |
return chunks | |
class ChunkMerger(object): | |
def __init__(self, chunks): | |
self.chunks = chunks | |
self.heap = [] | |
def __enter__(self): | |
self.fp = [open(chunk, "r") for chunk in self.chunks] | |
for idx, f in enumerate(self.fp): | |
line = f.readline() | |
heapq.heappush(self.heap, (line, idx)) | |
return self | |
def __exit__(self, *args, **kwargs): | |
for f in self.fp: | |
f.close() | |
for chunk in self.chunks: | |
os.remove(chunk) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if not self.heap: | |
raise StopIteration | |
line, idx = heapq.heappop(self.heap) | |
new_line = self.fp[idx].readline() | |
if new_line: | |
heapq.heappush(self.heap, (new_line, idx)) | |
return line | |
def next(self): | |
return self.__next__() | |
def merge_chunks(chunks, tmp_dir): | |
with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tf: | |
with ChunkMerger(chunks) as cm: | |
for line in cm: | |
tf.write(line) | |
return tf.name | |
def sort_file(fin, fout): | |
""" | |
Sort a large file by storing partially-sorted chunks on disk. | |
First we split a file in chunks of maximum CHUNK_SIZE lines each and sort them using heap sort algorithm. | |
Then we merge the chunks using a merge sort algorithm, opening at most MERGE_LIMIT files at once. | |
""" | |
tmp_dir = os.path.dirname(fout) | |
with open(fin, "r") as f: | |
chunks = make_chunks(f, tmp_dir) | |
if not chunks: | |
os.copy(fin, fout) | |
while len(chunks) > 1: | |
chunks = [ | |
merge_chunks(chunks[i:i+MERGE_LIMIT], tmp_dir) | |
for i in xrange(0, len(chunks), MERGE_LIMIT) | |
] | |
os.rename(chunks[0], fout) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("input", help="Input file name") | |
parser.add_argument("output", help="Output file name") | |
args = parser.parse_args() | |
sort_file(args.input, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment