Skip to content

Instantly share code, notes, and snippets.

@suvojit-0x55aa
Created May 22, 2019 10:40
Show Gist options
  • Save suvojit-0x55aa/26f022bf31b6b045db43ff9bc34b162c to your computer and use it in GitHub Desktop.
Save suvojit-0x55aa/26f022bf31b6b045db43ff9bc34b162c to your computer and use it in GitHub Desktop.
from __future__ import division, print_function
import concurrent.futures as cf
from glob import glob
import logging
import os
from time import time, sleep
import boto3
class AWSUploader:
'''
AWS uploader class definition.
Params:
aws_bucket: A valid bucket name to be uploaded to.
max_workers: If not None upload will be done in calling thread,
else each object will maintatin its own thread pool,
and the job is executed based on a Job Queue.
'''
def __init__(self, aws_bucket, max_workers):
self.max_workers = max_workers
self.aws_bucket = aws_bucket
self.log = logging.getLogger(__name__)
if self.max_workers is not None:
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_workers)
print('Initiating {} with {} size ThreadPool, AWS bucket {}'.format(
self.__class__.__name__, self.max_workers, self.aws_bucket))
def uploadCallback(self, so_far):
print('{} bytes transferred'.format(so_far))
def _uploadToAWS(self, filepath):
try:
taw = time()
# Create analytics object that runs on main thread
# aws_analytics = REST()
# Create AWS session with credential from ENV variable
awsSession = boto3.session.Session()
s3 = awsSession.resource('s3')
bucket = s3.Bucket(self.aws_bucket)
key_name = os.path.basename(filepath)
try:
bucket.Object(key_name).get()
tEndaw = time() - taw
print("Key exist: {} | Time taken: {}".format(
key_name, tEndaw))
except Exception as e:
# Upload to bucket.
bucket.upload_file(
filepath, key_name, Callback=self.uploadCallback)
tEndaw = time() - taw
print("AWS Upload with key: {} | Time taken: {}".format(
key_name, tEndaw))
# Upload to analytics.
# aws_analytics.addToKibana(key_name, self.g.TIME_FOR_AWS_UPLOAD,
# {"time": (tEndaw)}, userId, hostname)
return 'file uploaded succesfully'
except Exception as e:
print('{} upload failed for {}'.format(e,
os.path.basename(filepath)))
def uploadToAWS(self, filepath):
'''
Wrapper function to call the uploader based on availability of
Thread Pool Executor.
'''
if self.max_workers is not None:
self.executor.submit(self._uploadToAWS, filepath)
else:
self._uploadToAWS(filepath)
if __name__ == '__main__':
aws_uploader = AWSUploader('bucket', None)
filenames = glob('/static/image/*.jpg')
for files in filenames:
aws_uploader.uploadToAWS(files)
sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment