Created
May 1, 2020 14:27
-
-
Save timothymugayi/6c527e93529042e58aa91d2af60a37bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import magic | |
import boto3 | |
import threading | |
from pathlib import Path | |
from boto3.s3.transfer import TransferConfig | |
class ProgressPercentage(object): | |
def __init__(self, filename): | |
self._filename = filename | |
self._size = float(os.path.getsize(filename)) | |
self._seen_so_far = 0 | |
self._lock = threading.Lock() | |
def __call__(self, bytes_amount): | |
# To simplify we'll assume this is hooked up | |
# to a single filename. | |
with self._lock: | |
self._seen_so_far += bytes_amount | |
percentage = (self._seen_so_far / self._size) * 100 | |
sys.stdout.write( | |
"\r%s %s / %s (%.2f%%)" % ( | |
self._filename, self._seen_so_far, self._size, | |
percentage)) | |
# standard out is buffered (meaning that it collects some of the data "written" to standard out before it writes it to the terminal). | |
# Calling sys.stdout.flush() forces it to "flush" the buffer, | |
# meaning that it will write everything in the buffer to the terminal | |
sys.stdout.flush() | |
# below illustrates multipart upload with concurrency with s3. Resuming failed uploads goes beyond the scope of this article | |
class S3SimpleMultiPartUploadClient(object): | |
def __init__(self): | |
self.s3 = boto3.client('s3') | |
def upload(self, bucket_name, s3_bucket_key_path, | |
file_path, mime_type, multipart_threshold=1024 * 25, | |
multipart_chunksize=1024 * 25, max_concurrency=8, use_threads=True): | |
""" | |
S3 boto3 Multipart upload | |
:param bucket_name: | |
:param s3_bucket_key_path: | |
:param file_path: | |
:param multipart_threshold: The transfer size threshold for which | |
multipart uploads, downloads, and copies will automatically be | |
triggered. | |
:param max_concurrency: The maximum number of threads that will be | |
making requests to perform a transfer. If ``use_threads`` is | |
set to ``False``, the value provided is ignored as the transfer | |
will only ever use the main thread. | |
:param multipart_chunksize: The partition size of each part for a | |
multipart transfer. | |
:param io_chunksize: The max size of each chunk in the io queue. | |
Currently, this is size used when ``read`` is called on the | |
downloaded stream as well. | |
:param use_threads: If True, threads will be used when performing | |
S3 transfers. If False, no threads will be used in | |
performing transfers: all logic will be ran in the main thread. | |
""" | |
config = TransferConfig(multipart_threshold=multipart_threshold, max_concurrency=max_concurrency, | |
multipart_chunksize=multipart_chunksize, use_threads=use_threads) | |
self.s3.upload_file(file_path, bucket_name, s3_bucket_key_path, | |
ExtraArgs={'ContentType': mime_type}, | |
Config=config, | |
Callback=ProgressPercentage(file_path) | |
) | |
if __name__ == '__main__': | |
file_path = '/Users/timothy.mugayi/Development/largeData/data/ml-latest/user_data.txt' #20GB file | |
file_name = Path(file_path).name | |
mime_type = magic.from_file(file_path, mime=True) | |
s3_multipart_client = S3SimpleMultiPartUploadClient() | |
s3_multipart_client.upload('saturncloud-ml-latest', 'data/{}'.format(file_name), | |
file_path, mime_type) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment