Created
December 13, 2011 10:02
-
-
Save mattbillenstein/1471499 to your computer and use it in GitHub Desktop.
boto parallel s3 upload script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
import boto | |
import config | |
import gevent | |
import gevent.pool | |
import os | |
import sys | |
import time | |
import traceback | |
import optparse | |
from cStringIO import StringIO | |
CHUNK_SIZE = 5 * 1024 * 1024 | |
def get_connection(): | |
return boto.connect_s3(aws_access_key_id = config.aws['aws_access_key_id'], aws_secret_access_key = config.aws['aws_secret_access_key']) | |
def upload_part(mp, fname, idx, offset): | |
f = open(fname) | |
f.seek(offset) | |
content = f.read(CHUNK_SIZE) | |
f.close() | |
success = False | |
for x in xrange(3): | |
try: | |
conn = get_connection() | |
bucket = conn.lookup(mp.bucket_name) | |
p = boto.s3.multipart.MultiPartUpload(bucket) | |
p.id = mp.id | |
p.key_name = mp.key_name | |
p.upload_part_from_file(StringIO(content), idx+1, replace=True) | |
success = True | |
break | |
except Exception, e: | |
print "Error in part upload - %s %s %s" % (fname, idx, offset) | |
print traceback.format_exc() | |
assert success, "Part failed - %s %s %s" % (fname, idx, offset) | |
def upload(options): | |
conn = get_connection() | |
bck = conn.create_bucket(options.bucket) | |
pool = gevent.pool.Pool(options.concurrency) | |
for fname in options.files: | |
if options.path == '.': | |
fpath = os.path.basename(fname) | |
else: | |
fpath = os.path.join(options.path, os.path.basename(fname)) | |
s = "Putting: %s -> %s/%s ..." % (fname, options.bucket, fpath), | |
print "%-80s" % (s), | |
sys.stdout.flush() | |
start = time.time() | |
size = os.stat(fname).st_size | |
if size > (CHUNK_SIZE*2) and options.concurrency > 1: | |
mp = bck.initiate_multipart_upload(fpath, reduced_redundancy=options.reduced_redundancy) | |
greenlets = [] | |
idx = offset = 0 | |
while offset < size: | |
greenlets.append( pool.spawn(upload_part, mp, fname, idx, offset) ) | |
idx += 1 | |
offset += CHUNK_SIZE | |
gevent.joinall(greenlets) | |
cmp = mp.complete_upload() | |
else: | |
key = bck.new_key(fpath) | |
f = open(fname) | |
key.set_contents_from_file(f, reduced_redundancy=options.reduced_redundancy, replace=True) | |
f.close() | |
size = float(size)/1024/1024 | |
elapsed = time.time() - start | |
print " %6.1f MiB in %.1fs (%d KiB/s)" % (size, elapsed, int(size*1000/elapsed)) | |
def main(argv): | |
parser = optparse.OptionParser() | |
parser.set_usage('%prog [options] <bucket> <path> <files>') | |
parser.add_option('-c', '--concurrency', dest='concurrency', type='int', help='Number of parts to upload simultaneously', default=3) | |
parser.add_option('-r', '--reduced_redundancy', dest='reduced_redundancy', help='Use S3 reduced redundancy', action='store_true', default=False) | |
options, args = parser.parse_args() | |
if not args or len(args) < 3: | |
parser.print_help() | |
sys.exit(1) | |
options.bucket = args[0] | |
options.path = args[1] | |
options.files = args[2:] | |
upload(options) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
I've tried running your snippet (slight modifications, just added host, port and calling_format to get_connection) with boto 2.28.0 on my local Ceph cluster (version 0.80-702-g9bac31b). Unfortunately I get the error:
When I try to upload a smaller file using it - everything's fine.
And the file is there. So I've got the right permissions to create buckets and upload objects.
Any ideas why initiating the multipart upload doesn't work for me?
Perhaps I'm missing something, but the Ceph's doc doesn't seem to say anything helpful...
Any help would be appreciated.