Skip to content

Instantly share code, notes, and snippets.

@mtigas
Last active February 15, 2018 17:04
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save mtigas/764224 to your computer and use it in GitHub Desktop.
Command-line s3 uploader that uses the MultiPart (chunked) S3 upload functionality to parallelize and speed up large file uploads.
#!/usr/bin/env python
# coding=utf-8
#
# s3up.py
# 2010-2011, Mike Tigas
# http://mike.tig.as/
#
# Usage:
# s3up filename
# Uploads the given file to the DEFAULT_BUCKET (see below)
# at the following path:
# files/YYYYMMDD/(filename)
#
# s3up filename [remote_directory]
# As above, except to the given directory:
# (remote_directory)/(filename)
#
# s3up filename [bucket] [remote_filename] [cache_time]
# s3up filename [bucket] [remote_filename] [cache_time] [policy]
#
#
#
# Please set the following options below before using:
# AWS_ACCESS_KEY_ID
# AWS_SECRET_ACCESS_KEY
# DEFAULT_BUCKET
# UPLOAD_PARALLELIZATION
# CHUNKING_MIN_SIZE
# CHUNK_RETRIES
import sys
import traceback
from mimetypes import guess_type
from datetime import datetime, timedelta
from time import sleep
from urllib import urlencode
from boto.s3.connection import S3Connection
import os
from cStringIO import StringIO
from threading import Thread
from math import floor
AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''
# When only giving one or two args, the following bucket is used:
DEFAULT_BUCKET = ''
# Number of simultaneous upload threads to execute.
UPLOAD_PARALLELIZATION = 4
# Minimum size for a file chunk (except final chunk). Needs to be >= 5242880.
CHUNKING_MIN_SIZE = 5242880
# For robustness, we can retry uploading any chunk up to this many times. (Set to
# 1 or less to only attempt one upload per chunk.) Because we chunk large uploads,
# an error in a single chunk doesn't necessarily mean we need to re-upload the
# entire thing.
CHUNK_RETRIES = 10
# ========== "MultiPart" (chunked) upload utility methods ==========
def mem_chunk_file(local_file):
"""
Given the file at `local_file`, returns a generator of CHUNKING_MIN_SIZE
(default 5MB) StringIO file-like chunks for that file.
"""
fstat = os.stat(local_file)
fsize = fstat.st_size
num_chunks = int(floor(float(fsize) / 5242880.0))
fp = open(local_file, 'rb')
for i in range(num_chunks):
if i == (num_chunks-1):
size_hint = 0
else:
size_hint = fsize / num_chunks
tfp = StringIO()
tfp.writelines(fp.readlines(size_hint))
tfp.seek(0)
yield tfp
fp.close()
def upload_worker(multipart_key, fp, index, headers=None):
"""
Uploads a file chunk in a MultiPart S3 upload. If an error occurs uploading
this chunk, retry up to `CHUNK_RETRIES` times.
"""
success = False
attempts = 0
while not success:
try:
fp.seek(0)
multipart_key.upload_part_from_file(fp, index, headers=headers)
except (KeyboardInterrupt, SystemExit):
raise
except:
success = False
attempts += 1
if attempts >= CHUNK_RETRIES:
break
sleep(0.5)
else:
success = True
if not success:
raise Exception("Upload of chunk %d failed after 5 retries." % index)
fp.close()
def upload_chunk(arg_list):
thread = Thread(
target=upload_worker,
args=arg_list
)
thread.daemon = False
thread.start()
return thread
# ========== Uploader methods ==========
def easy_up(local_file,rdir=None):
if os.path.isfile(local_file):
print "File:"
print os.path.abspath(local_file)
print
if not rdir:
rpath = "files/"+datetime.now().strftime("%Y%m%d")
else:
rpath = rdir
remote_path = rpath+"/"+os.path.basename(local_file)
upload_file(os.path.abspath(local_file), DEFAULT_BUCKET, remote_path,0)
print "File uploaded to:"
if BUCKET_CNAME:
print "%s/%s" % (BUCKET_CNAME, remote_path)
else:
print "https://s3.amazonaws.com/%s/%s" % (DEFAULT_BUCKET, remote_path)
print
else:
print "Path given is not a file."
def upload_file(local_file, bucket, remote_path, cache_time=0, policy="public-read", force_download=False):
# Expiration time:
cache_time = int(cache_time)
# Metadata that we need to pass in before attempting an upload.
content_type = guess_type(local_file, False)[0] or "application/octet-stream"
basic_headers = {
"Content-Type" : content_type,
}
if force_download:
basic_headers["Content-Disposition"] = "attachment; filename=%s"% os.path.basename(local_file)
# Set up a connection to S3
s3 = S3Connection(aws_access_key_id=AWS_ACCESS_KEY_ID,aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
bucket = s3.get_bucket(bucket)
# Get info on the local file to determine whether it's large enough that we can perform
# upload parallelization.
fstat = os.stat(local_file)
fsize = fstat.st_size
# Can only chunk in increments of 5MB, so if the file size is smaller than that, fall back to
# the "standard" upload procedure.
if fsize <= CHUNKING_MIN_SIZE:
print "Standard upload: File size is under %.1f MB\n" % (CHUNKING_MIN_SIZE/1024**2)
key = bucket.new_key(remote_path)
key.content_type = content_type
key.set_contents_from_filename(local_file, policy=policy, headers=basic_headers)
else:
print "Parallelized upload\n"
mp_key = bucket.initiate_multipart_upload(remote_path, headers=basic_headers)
try:
# Chunk the given file into `CHUNKING_MIN_SIZE` (default: 5MB) chunks that can
# be uploaded in parallel.
chunk_generator = mem_chunk_file(local_file)
# Use `UPLOAD_PARALLELIZATION` (default: 4) threads at a time to churn through
# the `chunk_generator` queue.
active_threads = []
for i, chunk in enumerate(chunk_generator):
args = (mp_key, chunk, i+1, basic_headers)
# If we don't have enough concurrent threads yet, spawn an upload thread to
# handle this chunk.
if len(active_threads) < UPLOAD_PARALLELIZATION:
# Upload this chunk in a background thread and hold on to the thread for polling.
t = upload_chunk(args)
active_threads.append(t)
# Poll until an upload thread finishes before allowing more upload threads to spawn.
while len(active_threads) >= UPLOAD_PARALLELIZATION:
for thread in active_threads:
# Kill threads that have been completed.
if not thread.isAlive():
thread.join()
active_threads.remove(thread)
# a polling delay since there's no point in constantly waiting and taxing CPU
sleep(0.1)
# We've exhausted the queue, so join all of our threads so that we wait on the last pieces
# to complete uploading.
for thread in active_threads:
thread.join()
except:
# Since we have threads running around and possibly partial data up on the server,
# we need to clean up before propogating an exception.
sys.stderr.write("Exception! Waiting for existing child threads to stop.\n\n")
for thread in active_threads:
thread.join()
# Remove any already-uploaded chunks from the server.
mp_key.cancel_upload()
for mp in bucket.list_multipart_uploads():
if mp.key_name == remote_path:
mp.cancel_upload()
# Propogate the error.
raise
else:
# We finished the upload successfully.
mp_key.complete_upload()
key = bucket.get_key(mp_key.key_name)
# ===== / chunked upload =====
if cache_time != 0:
key.set_metadata('Cache-Control','max-age=%d, must-revalidate' % int(cache_time))
else:
key.set_metadata('Cache-Control','no-cache, no-store')
if policy is "public-read":
key.make_public()
else:
key.set_canned_acl(policy)
def main(args):
if len(args) == 5:
upload_file(args[0],args[1],args[2],args[3],args[4])
elif len(args) == 4:
upload_file(args[0],args[1],args[2],args[3])
elif len(args) == 3:
upload_file(args[0],args[1],args[2])
elif len(args) == 2:
easy_up(args[0],args[1])
elif len(args) == 1:
easy_up(args[0],None)
else:
print "Usage:"
print "s3up filename"
print " Uploads the given file to DEFAULT_BUCKET (%s) at the following path:" % DEFAULT_BUCKET
print " files/YYYYMMDD/(filename)"
print
print "s3up filename [remote_directory]"
print " As above, except the file is uploaded to the given directory:"
print " (remote_directory)/(filename)"
print
print "s3up filename [bucket] [remote_filename] [cache_time]"
print
print "s3up filename [bucket] [remote_filename] [cache_time] [policy]"
print
if __name__ == '__main__':
try:
main(sys.argv[1:])
except Exception, e:
sys.stderr.write('\n')
traceback.print_exc(file=sys.stderr)
sys.stderr.write('\n')
sys.exit(1)
@sajal
Copy link

sajal commented Jan 4, 2012

<Error><Code>EntityTooSmall</Code><Message>Your proposed upload is smaller than the minimum allowed size</Message><ETag>xxxxxxxxxxx</ETag><MinSizeAllowed>5242880</MinSizeAllowed><ProposedSize>1025407</ProposedSize><RequestId>xxxxxxxxxx</RequestId><HostId>xxxxxxxxxxxxx</HostId><PartNumber>4719</PartNumber></Error>

when trying this with a 24GB file ... based on my data transfer log, this was probably the last chunk

@mtigas
Copy link
Author

mtigas commented Jan 4, 2012

S3 always ignores the file size of the last chunk, so this is either due to some counting error regarding the number of chunks the file actually needs or the file was modified while being read.

I'll experiment with this to make sure I haven't done anything stupid here.

@sajal
Copy link

sajal commented Jan 4, 2012

I also tried a much higher value for CHUNKING_MIN_SIZE . one where filesize mod CHUNKING_MIN_SIZE = 0

Then one time i got internal server error, and the next time, after the upload, nothing happened. no more network activity, nothing, so i killed it. Finally i split the file into 4GB chunks and uploaded it via s3cmd. FWIW I was uploading from an ec2 instance to S3.

@dknochen
Copy link

this upload works only for text files as "file.readlines" is used ("If given an optional parameter sizehint, it reads that many bytes from the file and enough more to complete a line, and returns the lines from that."). Therefore, the chunking is only kind of explicit.

To make it more explicit for binary files you can simply replace:

        if i == (num_chunks-1):
            size_hint = 0
        else:
            size_hint = fsize / num_chunks

        tfp = StringIO()
        tfp.writelines(fp.readlines(size_hint))
        tfp.seek(0)
        yield tfp

with

        size = fsize / num_chunks
        tfp = StringIO()
        if i == (num_chunks-1):
          tfp.write(fp.read())
        else:
          tfp.write(fp.read(size))
        tfp.seek(0)
        yield tfp

@pdmn
Copy link

pdmn commented Aug 20, 2012

Traceback (most recent call last):
File "s3up.py", line 284, in
main(sys.argv[1:])
File "s3up.py", line 264, in main
easy_up(args[0],args[1])
File "s3up.py", line 148, in easy_up
if BUCKET_CNAME:
NameError: global name 'BUCKET_CNAME' is not defined

I see this message when I am uploading a 6 GB file. Am I missing some arguments ?
Where should I set the BUCKET_CNAME ???
Please help...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment