Skip to content

Instantly share code, notes, and snippets.

@shaggyone
Last active December 16, 2021 06:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shaggyone/0bae26b739e8b0b5e80daad8062f5f0c to your computer and use it in GitHub Desktop.
Save shaggyone/0bae26b739e8b0b5e80daad8062f5f0c to your computer and use it in GitHub Desktop.
Multipart upload to s3 io stream
# NOTE. I'm not a python developer, so my solution may be not optimal
# Feel free to use it. But, please star it, so I could see someone found it useful.
import boto3
import gzip
from io import BytesIO, RawIOBase
class UploadToS3Stream(RawIOBase):
def __init__(self, s3, bucket_name, remote_path):
self.s3 = s3
self.bucket_name = bucket_name
self.remote_path = remote_path
self.output_buf = BytesIO()
self.bytes_in_buf = 0
self.total_bytes = 0
self.chunk_size = 5*1024*1024
self.out_obj = self.s3.Object(self.bucket_name, self.remote_path)
self.mpu = None
self.mpu_parts = None
self.current_part = None
def writable(self):
True
def write(self, bytes):
self.output_buf.write(bytes)
self.bytes_in_buf += len(bytes)
self.total_bytes += len(bytes)
self.autoflush()
def autoflush(self):
if self.bytes_in_buf > self.chunk_size:
self.flush()
def flush(self):
if self.bytes_in_buf > self.chunk_size or self.closed:
self.flush_using_mp_upload()
def flush_using_mp_upload(self):
self.output_buf.seek(0)
if self.mpu is None:
self.mpu = self.out_obj.initiate_multipart_upload()
self.mpu_parts = []
self.current_part = 0
self.current_part += 1
part = self.mpu.Part(self.current_part)
upload = part.upload(Body=self.output_buf)
self.mpu_parts.append({
'ETag': upload['ETag'],
'PartNumber': self.current_part,
})
self.output_buf.seek(0)
self.output_buf.truncate()
self.bytes_in_buf = 0
if self.closed:
self.mpu.complete(
MultipartUpload={ 'Parts': self.mpu_parts },
RequestPayer='requester'
)
def abort_mp_uploads(self):
if not (self.mpu is None):
self.mpu.abort()
def abort(self):
super().close()
self.abort_mp_uploads()
def close(self):
super().close()
self.flush()
def isatty(self):
False
def readable(self):
False
def seekable(self):
False
def tell(self):
self.total_bytes
if __name__ == '__main__':
s3 = boto3.resource('s3')
uploader = UploadToS3Stream(s3, 'bucket_name', 'remote_path')
compressor = gzip.GzipFile(fileobj=uploader, mode='w')
compressor.write('{"foo": "bar"}')
compressor.close
uploader.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment