Last active
January 11, 2020 19:50
-
-
Save Nihhaar/f8f95d860e11469dc11a0e75b241e346 to your computer and use it in GitHub Desktop.
Upload content (bytes) to aws s3 using multipart upload api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
# Usage: | |
# with S3MultipartUpload(client=boto3.client('s3'), destination_bucket, destination_key) as s3upload: | |
# # Do something | |
# s3upload.upload(content) | |
# # Do something | |
# s3upload.upload(content) | |
# # Do something | |
# s3upload.flush() | |
# | |
# This usage format is required since if the multi-part upload is aborted, we need to call abort_multipart_upload() | |
# to delete the intermediate parts generated else we will be paying for the storage of these intermediate parts AND | |
# if the multi-part upload is completed, we need to call completed_multipart_upload() for telling aws to construct | |
# the final object from the uploaded parts else we will be paying for the storage of these intermediate parts without | |
# the final object. This usage format ensures both functions are called according to the situation. | |
# | |
# Read more here: https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html | |
class S3MultipartUpload(): | |
def __init__(self, client, bucket, key, part_size=int(5e6)): | |
self.client = client | |
self.bucket = bucket | |
self.key = key | |
self.part_size = part_size | |
self.count_bytes = 0 | |
self.upload_bytes = b'' | |
self.part_counter = 1 | |
self.counter = 1 | |
self.parts = [] | |
self.mpuid = None | |
def __enter__(self): | |
if self.mpuid == None: | |
self.mpuid = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)["UploadId"] | |
print("{} : Initiated multi-part upload with ID: {} for {}".format(datetime.now(), self.mpuid, self.bucket + "/" + self.key)) | |
return self | |
def __exit__(self, type, value, tb): | |
if tb is None: | |
# No Exception, so complete the multipart upload | |
print("{} : Completing multipart upload for key: {}".format(datetime.now(), self.key)) | |
result = self.client.complete_multipart_upload( | |
Bucket=self.bucket, | |
Key=self.key, | |
UploadId=self.mpuid, | |
MultipartUpload={"Parts": self.parts}) | |
else: | |
# Exception occured, so abort the multipart upload | |
print("{} : Aborting multipart upload with key: {}".format(datetime.now(), self.key)) | |
self.client.abort_multipart_upload( | |
Bucket=self.bucket, | |
Key=self.key, | |
UploadId=self.mpuid | |
) | |
print("{} : Done".format(datetime.now())) | |
def upload(self, part_bytes): | |
self.upload_bytes += part_bytes | |
if len(self.upload_bytes) > self.part_size: | |
part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter) | |
self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]}) | |
self.part_counter += 1 | |
self.count_bytes += len(self.upload_bytes) | |
self.upload_bytes = b'' | |
if self.count_bytes / self.counter >= 100000000: | |
print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000)) | |
self.counter += 1 | |
def flush(self): | |
# Upload if anything is remaining | |
if len(self.upload_bytes) != 0: | |
part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter) | |
self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]}) | |
self.part_counter += 1 | |
self.count_bytes += len(self.upload_bytes) | |
self.upload_bytes = b'' | |
print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment