Skip to content

Instantly share code, notes, and snippets.

@blinsay
Created March 15, 2013 01:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blinsay/5166916 to your computer and use it in GitHub Desktop.
Save blinsay/5166916 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# encoding: utf-8
from eventlet import patcher, GreenPool
patcher.monkey_patch(all = True)
import sys
from eventlet.queue import Queue, Empty
from boto import connect_s3
from boto.s3.connection import S3Connection
class GreenPoolDownloader(object):
def __init__(self, aws_id, aws_secret, greenthreads = 80):
self.aws_id = aws_id
self.aws_secret = aws_secret
self.pool = GreenPool(size = greenthreads)
self.queue = Queue()
self._broken_pipe = False
@classmethod
def from_boto_conn(cls, boto_conn, greenthreads = 20):
return cls(boto_conn.access_key, boto_conn.secret_key, greenthreads = greenthreads)
def _new_conn(self, bucket_name):
"""
Return a new connection to a bucket.
"""
conn = S3Connection(self.aws_id, self.aws_secret)
bucket = conn.get_bucket(bucket_name)
return bucket
def download_key(self, out_file):
try:
bucket_name, key_name = self.queue.get_nowait()
except Empty:
return
if self._broken_pipe:
return
bucket = self._new_conn(bucket_name)
k = bucket.get_key(key_name)
if k is None:
raise ValueError, "key {name} does not exist".format(name = key_name)
try:
k.get_contents_to_file(out_file)
out_file.flush()
except IOError, ioe:
errno, _ = ioe
if errno == 32:
self._broken_pipe = True
else:
raise
self.queue.task_done()
def download_all(self, bucket_name, prefix, out_file):
bucket = self._new_conn(bucket_name)
for key in bucket.list(prefix = prefix):
if key.size != 0:
self.queue.put((bucket_name, key.name))
while not self.queue.empty():
self.pool.spawn_n(self.download_key, out_file)
self.pool.waitall()
def parse_s3_uri(uri):
"""
Split an S3 URI into a (bucket, path) tuple.
"""
if not uri.startswith('s3://'):
raise ValueError, "invalid S3 uri:, " + uri
uri = uri[5:]
uri = uri.lstrip('/')
bucket_name, path = uri.split('/', 1)
return bucket_name, path
# NOTE: This is from the serial version. Kept here for when I break the concurrent
# version! Async I/O is hard.
def merge_to_file(s3_conn, uri, fp, join_with = None):
"""
Given an S3 uri, merge the contents of all keys that match the object prefix
into the given file. Keys are written to the file in an arbitrary order.
If a `join_with` argument is given, the object is written at the end of
every object downloaded from S3.
"""
bucket_name, key_prefix = parse_s3_uri(uri)
bucket = s3_conn.get_bucket(bucket_name)
for key in bucket.list(prefix = key_prefix):
if key.size:
key.get_contents_to_file(fp)
if join_with is not None:
fp.write(join_with)
fp.flush()
def main(args):
if len(args) < 1:
print >> sys.stderr, "USAGE: s3merge s3_uri [s3_uri...]"
sys.exit(-1)
s3_conn = connect_s3()
downloader = GreenPoolDownloader.from_boto_conn(s3_conn, greenthreads = 20)
for arg in args:
try:
bucket_name, key_prefix = parse_s3_uri(arg)
downloader.download_all(bucket_name, key_prefix, sys.stdout)
except Exception, e:
print >> sys.stderr, "ERROR: %s" % e
sys.exit(-1)
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment