Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
boto3 S3 Multipart Upload with the ability to resume the upload after a failure
#!/usr/bin/env python3
# See https://gist.github.com/teasherm/bb73f21ed2f3b46bc1c2ca48ec2c1cf5
import argparse
import os
import boto3
class S3MultipartUpload(object):
# AWS throws EntityTooSmall error for parts smaller than 5 MB
PART_MINIMUM = int(5e6)
def __init__(self,
bucket,
key,
local_path,
part_size=int(15e6),
profile_name=None,
region_name="eu-west-1",
verbose=False):
self.bucket = bucket
self.key = key
self.path = local_path
self.total_bytes = os.stat(local_path).st_size
self.part_bytes = part_size
assert part_size > self.PART_MINIMUM
assert (self.total_bytes % part_size == 0
or self.total_bytes % part_size > self.PART_MINIMUM)
self.s3 = boto3.session.Session(
profile_name=profile_name, region_name=region_name).client("s3")
if verbose:
boto3.set_stream_logger(name="botocore")
def abort_all(self):
mpus = self.s3.list_multipart_uploads(Bucket=self.bucket) # Prefix=key
aborted = []
print("Aborting", len(mpus), "uploads")
if "Uploads" in mpus:
for u in mpus["Uploads"]:
upload_id = u["UploadId"] # also: Key
aborted.append(
self.s3.abort_multipart_upload(
Bucket=self.bucket, Key=self.key, UploadId=upload_id))
return aborted
def get_uploaded_parts(self, upload_id):
parts = []
res = self.s3.list_parts(Bucket=self.bucket, Key=self.key, UploadId=upload_id)
if "Parts" in res:
for p in res["Parts"]:
parts.append(p) # PartNumber, ETag, Size [bytes], ...
return parts
def create(self):
mpu = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.key)
mpu_id = mpu["UploadId"]
return mpu_id
def upload(self, mpu_id, parts = []):
uploaded_bytes = 0
with open(self.path, "rb") as f:
i = 1
while True:
data = f.read(self.part_bytes)
if not len(data):
break
if len(parts) >= i:
# Already uploaded, go to the next one
part = parts[i - 1]
if len(data) != part["Size"]:
raise Exception("Size mismatch: local " + str(len(data)) + ", remote: " + part["Size"])
parts[i - 1] = {k: part[k] for k in ('PartNumber', 'ETag')}
else:
part = self.s3.upload_part(
# We could include `ContentMD5='hash'` to discover if data has been corrupted upon transfer
Body=data, Bucket=self.bucket, Key=self.key, UploadId=mpu_id, PartNumber=i)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
uploaded_bytes += len(data)
print("{0} of {1} bytes uploaded ({2:.3f}%)".format(
uploaded_bytes, self.total_bytes,
as_percent(uploaded_bytes, self.total_bytes)))
i += 1
return parts
def complete(self, mpu_id, parts):
#print("complete: parts=" + str(parts))
result = self.s3.complete_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=mpu_id,
MultipartUpload={"Parts": parts})
return result
# Helper
def as_percent(num, denom):
return float(num) / float(denom) * 100.0
def parse_args():
parser = argparse.ArgumentParser(description='Multipart upload')
parser.add_argument('--bucket', required=True)
parser.add_argument('--key', required=True)
parser.add_argument('--path', required=True)
parser.add_argument('--region', default="eu-west-1")
parser.add_argument('--profile', default=None)
parser.add_argument('--uploadid', default=None) # To continue a previous upload
return parser.parse_args()
def main():
args = parse_args()
mpu = S3MultipartUpload(
args.bucket,
args.key,
args.path,
profile_name=args.profile,
region_name=args.region)
cont = args.uploadid != None
if cont:
mpu_id = args.uploadid
print("Continuing upload with id=", mpu_id)
finished_parts = mpu.get_uploaded_parts(mpu_id)
parts = mpu.upload(mpu_id, finished_parts)
else:
# abort all multipart uploads for this bucket (optional, for starting over)
mpu.abort_all()
# create new multipart upload
mpu_id = mpu.create()
print("Starting upload with id=", mpu_id)
# upload parts
parts = mpu.upload(mpu_id)
# complete multipart upload
print(mpu.complete(mpu_id, parts))
if __name__ == "__main__":
main()
@vishalvikash93

This comment has been minimized.

Copy link

@vishalvikash93 vishalvikash93 commented Aug 13, 2020

@holyjak thanks for the code I am getting below error.
botocore.errorfactory.NoSuchUpload: An error occurred (NoSuchUpload) when calling the AbortMultipartUpload operation: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.

@holyjak

This comment has been minimized.

Copy link
Owner Author

@holyjak holyjak commented Aug 13, 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.