Skip to content

Instantly share code, notes, and snippets.

@Nihhaar
Last active January 11, 2020 19:50
Show Gist options
  • Save Nihhaar/f8f95d860e11469dc11a0e75b241e346 to your computer and use it in GitHub Desktop.
Save Nihhaar/f8f95d860e11469dc11a0e75b241e346 to your computer and use it in GitHub Desktop.
Upload content (bytes) to aws s3 using multipart upload api
from datetime import datetime
# Usage:
# with S3MultipartUpload(client=boto3.client('s3'), destination_bucket, destination_key) as s3upload:
# # Do something
# s3upload.upload(content)
# # Do something
# s3upload.upload(content)
# # Do something
# s3upload.flush()
#
# This usage format is required since if the multi-part upload is aborted, we need to call abort_multipart_upload()
# to delete the intermediate parts generated else we will be paying for the storage of these intermediate parts AND
# if the multi-part upload is completed, we need to call completed_multipart_upload() for telling aws to construct
# the final object from the uploaded parts else we will be paying for the storage of these intermediate parts without
# the final object. This usage format ensures both functions are called according to the situation.
#
# Read more here: https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html
class S3MultipartUpload():
def __init__(self, client, bucket, key, part_size=int(5e6)):
self.client = client
self.bucket = bucket
self.key = key
self.part_size = part_size
self.count_bytes = 0
self.upload_bytes = b''
self.part_counter = 1
self.counter = 1
self.parts = []
self.mpuid = None
def __enter__(self):
if self.mpuid == None:
self.mpuid = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)["UploadId"]
print("{} : Initiated multi-part upload with ID: {} for {}".format(datetime.now(), self.mpuid, self.bucket + "/" + self.key))
return self
def __exit__(self, type, value, tb):
if tb is None:
# No Exception, so complete the multipart upload
print("{} : Completing multipart upload for key: {}".format(datetime.now(), self.key))
result = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=self.mpuid,
MultipartUpload={"Parts": self.parts})
else:
# Exception occured, so abort the multipart upload
print("{} : Aborting multipart upload with key: {}".format(datetime.now(), self.key))
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=self.mpuid
)
print("{} : Done".format(datetime.now()))
def upload(self, part_bytes):
self.upload_bytes += part_bytes
if len(self.upload_bytes) > self.part_size:
part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter)
self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]})
self.part_counter += 1
self.count_bytes += len(self.upload_bytes)
self.upload_bytes = b''
if self.count_bytes / self.counter >= 100000000:
print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000))
self.counter += 1
def flush(self):
# Upload if anything is remaining
if len(self.upload_bytes) != 0:
part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter)
self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]})
self.part_counter += 1
self.count_bytes += len(self.upload_bytes)
self.upload_bytes = b''
print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment