Skip to content

Instantly share code, notes, and snippets.

@lucemia
Created November 10, 2013 15:08
Show Gist options
  • Save lucemia/7399288 to your computer and use it in GitHub Desktop.
Save lucemia/7399288 to your computer and use it in GitHub Desktop.
split file for mapreduce
class FileSplitPipe(base_handler.PipelineBase):
def run(self, input_path, output, shards):
# from google.appengine.api import files
# from cStringIO import StringIO
import time
import logging
def readline(_file):
# TODO: Need to fix it
content = _file.read()
for line in content.split('\n'):
yield line
timestamp = int(time.time())
output_files = ["%s-%s-%s"%(input_path, timestamp, k) for k in range(shards)]
lines = []
with gcs.open(input_path) as f:
index = 0
for i in readline(f):
lines.append(i)
chunk = len(lines) / shards + 1
for i in range(shards):
with gcs.open(output_files[i], 'w') as f:
[f.write(k) for k in lines[:chunk]]
del lines[:chunk]
# [f.close(finalize=True) for f in fs]
yield pipeline.common.Return(output_files)
class TestFileSplitHandler(webapp2.RequestHandler):
def get(self):
pipeline = FileSplitPipe('/migo/Cookie2Csv-15799777861189859799B-output', 'migo', 16)
pipeline.start()
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment