Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save timothymugayi/6c527e93529042e58aa91d2af60a37bf to your computer and use it in GitHub Desktop.
Save timothymugayi/6c527e93529042e58aa91d2af60a37bf to your computer and use it in GitHub Desktop.
import os
import sys
import magic
import boto3
import threading
from pathlib import Path
from boto3.s3.transfer import TransferConfig
class ProgressPercentage(object):
def __init__(self, filename):
self._filename = filename
self._size = float(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
# To simplify we'll assume this is hooked up
# to a single filename.
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._filename, self._seen_so_far, self._size,
percentage))
# standard out is buffered (meaning that it collects some of the data "written" to standard out before it writes it to the terminal).
# Calling sys.stdout.flush() forces it to "flush" the buffer,
# meaning that it will write everything in the buffer to the terminal
sys.stdout.flush()
# below illustrates multipart upload with concurrency with s3. Resuming failed uploads goes beyond the scope of this article
class S3SimpleMultiPartUploadClient(object):
def __init__(self):
self.s3 = boto3.client('s3')
def upload(self, bucket_name, s3_bucket_key_path,
file_path, mime_type, multipart_threshold=1024 * 25,
multipart_chunksize=1024 * 25, max_concurrency=8, use_threads=True):
"""
S3 boto3 Multipart upload
:param bucket_name:
:param s3_bucket_key_path:
:param file_path:
:param multipart_threshold: The transfer size threshold for which
multipart uploads, downloads, and copies will automatically be
triggered.
:param max_concurrency: The maximum number of threads that will be
making requests to perform a transfer. If ``use_threads`` is
set to ``False``, the value provided is ignored as the transfer
will only ever use the main thread.
:param multipart_chunksize: The partition size of each part for a
multipart transfer.
:param io_chunksize: The max size of each chunk in the io queue.
Currently, this is size used when ``read`` is called on the
downloaded stream as well.
:param use_threads: If True, threads will be used when performing
S3 transfers. If False, no threads will be used in
performing transfers: all logic will be ran in the main thread.
"""
config = TransferConfig(multipart_threshold=multipart_threshold, max_concurrency=max_concurrency,
multipart_chunksize=multipart_chunksize, use_threads=use_threads)
self.s3.upload_file(file_path, bucket_name, s3_bucket_key_path,
ExtraArgs={'ContentType': mime_type},
Config=config,
Callback=ProgressPercentage(file_path)
)
if __name__ == '__main__':
file_path = '/Users/timothy.mugayi/Development/largeData/data/ml-latest/user_data.txt' #20GB file
file_name = Path(file_path).name
mime_type = magic.from_file(file_path, mime=True)
s3_multipart_client = S3SimpleMultiPartUploadClient()
s3_multipart_client.upload('saturncloud-ml-latest', 'data/{}'.format(file_name),
file_path, mime_type)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment