Skip to content

Instantly share code, notes, and snippets.

@sajal
Created October 19, 2012 17:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sajal/3919506 to your computer and use it in GitHub Desktop.
Save sajal/3919506 to your computer and use it in GitHub Desktop.
S3Output for storing disco reduce items directly to Amazon S3
from disco.worker.classic.func import task_output_stream
class S3Output(object):
"""
Output stream for S3.
key from each reduce becomes key name in S3
If gzip param is True, it gzips before uploading.
AWS_KEY, AWS_SECRET and BUCKET_NAME in params is required.
"""
def __init__(self,stream,params):
self.aws_key = params["AWS_KEY"]
self.aws_secret = params["AWS_SECRET"]
self.bucket_name = params["BUCKET_NAME"]
self.go_gzip = params.get("gzip", False)
def add(self,key,val):
from boto.s3.connection import S3Connection
from boto.s3.key import Key
s3conn = S3Connection(self.aws_key, self.aws_secret)
bucket = s3conn.get_bucket(self.bucket_name)
k = Key(bucket)
if self.go_gzip:
import gzip
from cStringIO import StringIO
out = StringIO()
f = gzip.GzipFile(fileobj=out, mode='w')
f.write(val)
f.close()
out.reset()
k.key = key + ".gz"
k.set_contents_from_file(out)
else:
k.key = key
k.set_contents_from_string(val)
def close(self):
pass
def s3_output(stream, partition, url, params):
from s3output import S3Output
return S3Output(stream,params)
s3_output_stream = (task_output_stream, s3_output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment