Skip to content

Instantly share code, notes, and snippets.

@sajal
Created October 29, 2012 18:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sajal/3975607 to your computer and use it in GitHub Desktop.
Save sajal/3975607 to your computer and use it in GitHub Desktop.
Output stream for S3
from disco.worker.classic.func import task_output_stream
class S3LineOutput(object):
"""
Output stream for S3.
key from each reduce becomes key name in S3
If gzip param is True, it gzips before uploading.
AWS_KEY, AWS_SECRET and BUCKET_NAME in params is required.
"""
def __init__(self,stream,params,partition, url):
self.aws_key = params["AWS_KEY"]
self.aws_secret = params["AWS_SECRET"]
self.bucket_name = params["BUCKET_NAME"]
self.do_gzip = params.get("gzip", False)
self.do_unique = params.get("unique", False)
self.do_numeric_sort = params.get("numeric_sort")
self.partition = partition
self.files = {}
def add(self,key,val):
from tempfile import NamedTemporaryFile
if key not in self.files.keys():
self.files[key] = NamedTemporaryFile(delete=False)
self.files[key].write(val)
def runcommand(self, command):
import subprocess
pipe = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True)
output = pipe.stdout.readlines()
sts = pipe.wait()
def gzip(self, filename):
self.runcommand("pigz --no-name %s" %(filename))
return filename + ".gz"
def unique(self, filename):
import os
#sort -u -t, -k1,1 file
targetfile = "%s.sorted" %(filename)
self.runcommand("sort -u -k1,1 %s > %s" %(filename, targetfile))
os.remove(filename)
return targetfile
def numeric_sort(self, filename, col):
import os
targetfile = "%s.sorted" %(filename)
self.runcommand("sort -r -n -k%s,%s %s > %s" %(col, col, filename, targetfile))
os.remove(filename)
return targetfile
def close(self):
import os
from boto.s3.connection import S3Connection
from boto.s3.key import Key
s3conn = S3Connection(self.aws_key, self.aws_secret)
bucket = s3conn.get_bucket(self.bucket_name)
for key in self.files.keys():
filename = self.files[key].name
#We dont need this filehandle anymore
self.files[key].close()
#One ip only once
if self.do_unique:
filename = self.unique(filename)
if self.do_numeric_sort:
filename = self.numeric_sort(filename, self.do_numeric_sort)
if self.do_gzip:
filename = self.gzip(filename)
key = key + ".gz"
#Upload the file.
k = Key(bucket)
k.key = key
k.set_contents_from_filename(filename)
#delete the file
os.remove(filename)
def s3_line_output(stream, partition, url, params):
from s3lineoutput import S3LineOutput
return S3LineOutput(stream,params,partition, url)
s3_line_output_stream = (task_output_stream, s3_line_output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment