Skip to content

Instantly share code, notes, and snippets.

@mkowoods
Last active June 20, 2019 19:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mkowoods/7b3d819c7298dfb3faac to your computer and use it in GitHub Desktop.
Save mkowoods/7b3d819c7298dfb3faac to your computer and use it in GitHub Desktop.
An example of how to use map reduce logic to sort a file that is greater than the size of memorey
import random
import os
random.seed(42)
F_PATH = "data.csv"
OUTPUT_PATH_TEMPLATE = "tmp_%d.csv"
CHUNK_SIZE = 5
CHUNK_CTR = 0
BUFFER = []
def write_lines(f_obj, data, new_line_char = "\n"):
"""where data is an iterable of strings"""
f_obj.write(new_line_char.join(data))
def write_chunk(data, chunk_iter):
out_path = OUTPUT_PATH_TEMPLATE%chunk_iter
print data
with open(out_path, 'wb') as f:
write_lines(f, data)
print 'Wrote: ', out_path
def clear_buffer():
global BUFFER
BUFFER = []
def update_ctr():
global CHUNK_CTR
CHUNK_CTR += 1
def sort_buffer_and_dump_to_disk():
BUFFER.sort()
write_chunk(BUFFER, CHUNK_CTR)
clear_buffer()
update_ctr()
#create FAKE Shuffled Data
with open(F_PATH, 'wb') as f_fake:
data = ["This is line %02d"%i for i in range(25)]
random.shuffle(data)
write_lines(f_fake, data)
print '-------------Input Data ------------------'
print
for line in data:
print line
print
print '-------------Mapping to Partitions and dumped to tmp storage after sorting------------------'
print
with open(F_PATH, 'rb') as f:
#opens source file iterates through each line
for idx, line in enumerate(f):
#add new line to buffer list in memory to hold a batch of results
BUFFER.append(line.strip())
if idx == 0:
continue
#if idx is roughly size of the CHUNK then sort the data in the BUFFER
#and write it to disk. You could also you use len or sys.getsizeof
if (idx % CHUNK_SIZE == 0):
sort_buffer_and_dump_to_disk()
#if there's anything left in the buffer
if BUFFER:
sort_buffer_and_dump_to_disk()
print
print '-------------Merge Data------------------'
print
#create an array to hold pointers to each file on disk
FILES = [open(OUTPUT_PATH_TEMPLATE%i, 'rb') for i in range(CHUNK_CTR)]
#initialize a heap to hold the current record(buffer) from each file on disk
import heapq
heap = []
for idx, f in enumerate(FILES):
heapq.heappush(heap, (f.next(), idx))
while heap:
assert len(heap) <= len(FILES), 'Heap should only be one line from all files'
#read top element from heap (smallest current element from each file)
item, f_idx = heapq.heappop(heap)
#add the next element to the heap which would be the next record in the same file
#that you just popped. If you're at the end of the file then close that file.
try:
new_item = FILES[f_idx].next()
heapq.heappush(heap, (new_item, f_idx))
except StopIteration:
FILES[f_idx].close()
#this is the data you'd write to a file one record at a time.
#you can write a simple function to handle that or wrap the whole merge step
#in a context mgr
print item.strip()
#just in case any files were left open. can be removed in practice
for f in FILES:
f.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment