Skip to content

Instantly share code, notes, and snippets.

@kjenney
Created November 21, 2018 02:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kjenney/068531ffe01e14bb7a2351dc55592551 to your computer and use it in GitHub Desktop.
Save kjenney/068531ffe01e14bb7a2351dc55592551 to your computer and use it in GitHub Desktop.
Boto3 Upload Failure
import os
import io
import json
import boto3
from boto3.exceptions import ResourceNotExistsError
from botocore.exceptions import ClientError
import pprint
import logging
import tempfile
from wand.image import Image
import magic
s3 = boto3.resource('s3')
client = boto3.client('s3')
bucket = "super-secret-bucket-name"
my_bucket = s3.Bucket(bucket)
logger = logging.getLogger(__name__)
ops = {
"compress": 90
}
LOSSY_IMAGE_FMTS = ('jpg', 'jpeg', 'webp', 'png')
def is_lossy(ext, ops):
return ('fm' in ops and ops['fm'] in LOSSY_IMAGE_FMTS) or ext in LOSSY_IMAGE_FMTS
def is_image(file):
basename = os.path.basename(file)
name, ext = os.path.splitext(basename)
ext = ext.strip('.')
#print(ext)
if ext in LOSSY_IMAGE_FMTS:
#print('Awesome')
return True
else:
#print('Bummer')
return False
def get_mime_type(file_path):
mime = magic.Magic(mime=True)
return mime.from_file(file_path)
def get_file_size(file_path):
return os.path.getsize(file_path)
def get_folder(file):
if '/' in file:
return os.path.dirname(file)
else:
return '/'
def download(bucket, key):
"""
Download the file from s3 given `bucket` and `key`.
Returns
-------
str
the filename of the downloaded file or None if the file was not found
"""
basename = os.path.basename(key)
name, ext = os.path.splitext(basename)
print('Getting ready to download ' + basename)
logger.info('Downloading file from s3 at {}'.format(basename))
code, tmp_file = tempfile.mkstemp()
tmp_file = tmp_file + ext
print("This is what we're calling our temp file: " + tmp_file)
try:
my_bucket.download_file(key, tmp_file)
except ClientError:
logger.exception('File not found: {}'.format(basename))
#print('File not found: {}'.format(basename))
else:
return tmp_file
def upload(bucket, folder, filename):
print('Getting ready to upload ' + filename)
logger.info('Uploading file to s3 at {}'.format(filename))
basename = os.path.basename(filename)
mime_type = get_mime_type(filename)
print('File to Upload, filename: ' + filename)
print("Mime Type: " + mime_type)
print('Name in Bucket, basename: ' + basename)
print("client.upload_file(filename,my_bucket,basename)")
client.upload_file(filename,my_bucket,basename)
def image_compress(filename, ops):
"""
Compress the image specified by `filename` using the transformations specified by `ops` (operations)
Returns
-------
str
the filename of the transformed image
"""
print("Let's compress " + filename)
basename = os.path.basename(filename)
name, ext = os.path.splitext(basename)
ext = ext.strip('.')
code, path = tempfile.mkstemp()
output = path + '.' + ext
with Image(filename=filename) as src:
with src.clone() as img:
if is_lossy(ext, ops):
if 'compress' in ops:
q = int(ops['compress'])
else:
q = DEFAULT_QUALITY_RATE
img.compression_quality = q
img.save(filename=output)
return output
def optimize(file, folder):
img_filename = download(my_bucket, file)
if img_filename:
output_img = image_compress(img_filename, ops)
print('Compressed ' + img_filename + ' to ' + output_img)
upload(my_bucket, folder, output_img)
else:
return('Error')
def process(event, context):
for file in my_bucket.objects.all():
if is_image(file.key):
folder = get_folder(file.key)
optimize(file.key, folder)
return('Done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment