Skip to content

Instantly share code, notes, and snippets.

@fherbine
Last active October 20, 2023 11:30
Show Gist options
  • Save fherbine/0d4aa473e5cc2c5f6f8a0e1b35a62625 to your computer and use it in GitHub Desktop.
Save fherbine/0d4aa473e5cc2c5f6f8a0e1b35a62625 to your computer and use it in GitHub Desktop.
Download/upload asynchronously files from/to S3 bucket
import os
import queue
import threading
import time
import boto3
class AsynchronousS3:
"""Download/Upload asynchronously files from/to AWS S3 bucket.
Example:
>>> from asynchronous_s3 import AsynchronousS3
>>>
>>> def my_success_callback(size, duration):
... print(f'My {size} bytes file has been uploaded in {duration} sec.')
...
>>> def upload_to_s3():
... async_s3 = AsynchronousS3('my-bucket-name')
... async_s3.upload_file(
... 'path/to_my/local_file',
... 'object/key',
... on_success=my_success_callback,
... on_failure=lambda error: print(error),
... )
... print('code to be executed...')
...
>>> upload_file()
code to be executed...
My 105673 bytes file has been uploaded in 5.3242523 sec.
>>>
"""
def __init__(self, s3_name, *args, **kwargs):
"""Class constructor.
arguments:
s3_name -- Name of the bucket. Please check your credentials before
(~/.aws/credentials)
args -- extra arguments to give to boto3.session.Session instance.
Keywords arguments:
kwargs -- extra kwargs to give to boto3.session.Session instance.
"""
session = boto3.session.Session(*args, **kwargs)
service_resource = session.resource('s3')
self.bucket = service_resource.Bucket(s3_name)
self._io_threads_queue = threads_queue = queue.Queue()
self._daemon = _S3Daemon(threads_queue)
self._daemon.start()
def upload_file(self, local_path, key, on_success=None, on_failure=None,
**kwargs):
"""Upload a file from your computer to s3.
Arguments:
local_path -- Source path on your computer.
key -- AWS S3 destination object key. More info:
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
Keywords arguments:
on_success -- success callback to call. Given arguments will be:
file_size and duration. Default is `None`, any callback is called.
on_failure -- failure callback to call. Given arguments will be:
error_message. Default is `None`, any callback is called.
kwargs -- Extra kwargs for standard boto3 Bucket `upload_file` method.
"""
bucket = self.bucket
method = bucket.upload_file
thread = _S3Thread(
method,
on_success=on_success,
on_failure=on_failure,
threads_queue=self._io_threads_queue,
Key=key,
Filename=local_path,
**kwargs,
)
thread.start()
def dowload_file(self, local_path, key, on_success=None, on_failure=None,
**kwargs):
"""Download a file from S3 to your computer.
Arguments:
local_path -- Destination path on your computer.
key -- AWS S3 source object key. More info:
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
Keywords arguments:
on_success -- success callback to call. Given arguments will be:
file_size and duration. Default is `None`, any callback is called.
on_failure -- failure callback to call. Given arguments will be:
error_message. Default is `None`, any callback is called.
kwargs -- Extra kwargs for standard boto3 Bucket `download_file` method
"""
bucket = self.bucket
method = bucket.download_file
thread = _S3Thread(
method,
on_success=on_success,
on_failure=on_failure,
threads_queue=self._io_threads_queue,
Key=key,
Filename=local_path,
**kwargs,
)
thread.start()
def exit(self):
self._daemon.exit()
def __del__(self):
self.exit()
class _S3Thread(threading.Thread):
def __init__(self, method, on_success, on_failure, threads_queue,
*args, **kwargs):
self._method = method
self._on_success = on_success
self._on_failure = on_failure
self._threads_queue = threads_queue
self._meth_args = args
self._meth_kwargs = kwargs
self._start_time = time.time()
super().__init__()
def run(self):
method = self._method
args = self._meth_args
kwargs = self._meth_kwargs
try:
method(*args, **kwargs)
self.success()
except Exception as error:
self.failed(error)
def success(self):
file_path = self._meth_kwargs['Filename']
file_size = os.path.getsize(file_path)
duration = time.time() - self._start_time
self.stop(self._on_success, file_size, duration)
def failed(self, error_message):
self.stop(self._on_failure, error_message)
def stop(self, callback, *args):
if callback is not None:
callback(*args)
self._threads_queue.put(self)
class _S3Daemon(threading.Thread):
def __init__(self, threads_queue):
self._threads_queue = threads_queue
self._running_event = threading.Event()
self._running_event.set()
super().__init__(daemon=True)
def run(self):
while self._running_event.is_set():
time.sleep(.1)
try:
thread = self._threads_queue.get_nowait()
except queue.Empty:
continue
thread.join(.2)
def exit(self):
self._running_event.clear()
self.join(.2)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Debugging pyaws.s3')
parser.add_argument(
'--mode',
'-m',
required=False,
help='dowload from S3. Default is `down` (download). It could be `up`',
type=str,
default='down',
)
parser.add_argument(
'--bucket-name',
'-b',
required=True,
type=str,
help='AWS S3 bucket name.'
)
parser.add_argument(
'--file',
'-f',
required=True,
type=str,
help='Local file dst/src.'
)
parser.add_argument(
'--object-key',
'-k',
required=True,
type=str,
help='S3 Object key'
)
args = parser.parse_args()
mode = 'download' if args.mode.lower() != 'up' else 'upload'
bucket_name = args.bucket_name
file_path = args.file
object_key = args.object_key
def on_success(file_size, duration):
print(f'A {file_size} bytes has been {mode} in {duration} secs.')
def on_failure(error):
print('An error occured: %s' % error)
s3 = AsynchronousS3(bucket_name)
if mode == 'download':
s3.dowload_file(
local_path=file_path,
key=object_key,
on_success=on_success,
on_failure=on_failure,
)
else:
s3.upload_file(
local_path=file_path,
key=object_key,
on_success=on_success,
on_failure=on_failure,
)
print('Code to be executed...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment