Skip to content

Instantly share code, notes, and snippets.

@boffbowsh
Created November 28, 2019 08:33
Show Gist options
  • Save boffbowsh/94dd36cf31fa74217cc9c72d9fd9a8b0 to your computer and use it in GitHub Desktop.
Save boffbowsh/94dd36cf31fa74217cc9c72d9fd9a8b0 to your computer and use it in GitHub Desktop.
import base64
from io import BytesIO
from os.path import basename, splitext, sep
import re
import sys
import boto3
from PIL import Image, ImageOps
client = boto3.client('s3')
def handle(event, context):
if "s3" in event["Records"][0]:
return handle_s3(event, context)
elif "cf" in event["Records"][0]:
return handle_cf(event, context)
def handle_cf(event, context):
r = event["Records"][0]
response = r["cf"]["response"]
if response["status"] == "200":
return response
uri = r["cf"]["request"]["uri"]
if not uri.startswith('/thumbs'):
return response
# ['', 'thumbs', '300', 'crop=center', 'leaflets', 'foo.jpg']
parts = uri.split(sep)
parts.pop(0); parts.pop(0)
size = int(parts.pop(0))
options = {}
while '=' in parts[0]:
part = parts.pop(0)
k, v = part.split('=')
options[k] = v
path = 'raw_leaflets/{}'.format(sep.join(parts))
image = fetch_image('data.electionleaflets.org', path)
processed = process_image(image, (size, options))
key = new_key((size, options), path)
upload_image(processed, image.format, 'data.electionleaflets.org', key)
io = BytesIO()
processed.save(io, image.format)
response['headers']['content-type'] = [{'key': 'Content-Type', 'value': Image.MIME[image.format]}]
response['body'] = re.sub(r'\n', '', base64.encodebytes(io.getvalue()).decode('ascii'))
response['bodyEncoding'] = 'base64'
response['status'] = '200'
print(response)
return response
def handle_s3(event, context):
SPECS = (
(350, {"crop": "top"}),
(150, {"crop": "noop"}),
(1000, {"crop": "noop"}),
(600, {"crop": "center"}),
(350, {}),
(600, {"crop": "center"}),
(600, {"crop": "center"}),
)
for r in event["Records"]:
if not r["s3"]["object"]["key"].startswith('raw_leaflets/'):
continue
image = fetch_image(r["s3"]["bucket"]["name"], r["s3"]["object"]["key"])
for spec in SPECS:
processed = process_image(image, spec)
key = new_key(spec, r["s3"]["object"]["key"])
upload_image(processed, image.format, r["s3"]["bucket"]["name"], key)
def fetch_image(bucket: str, key: str):
response = client.get_object(Bucket=bucket, Key=key)
return Image.open(response['Body'])
def new_key(spec: tuple, key: str) -> str:
size = spec[0]
options = spec[1]
option_parts = [str(size)]
option_parts.extend(sorted(['='.join(o) for o in options.items()]))
base = re.sub(r'^raw_leaflets/', '', key)
return "thumbs/{}/{}".format('/'.join(option_parts), base)
def process_image(image: Image, spec: str) -> Image:
size = spec[0]
options = spec[1]
print(spec)
if "crop" not in options or options["crop"] == "noop":
new = image.copy()
new.thumbnail((size, size))
elif "crop" in options:
if options["crop"] == "top":
centering = (0.5, 0)
elif options["crop"] == "center":
centering = (0.5, 0.5)
new = ImageOps.fit(image, (size, size), centering=centering)
return new
def upload_image(image: Image, format: str, bucket: str, key: str):
io = BytesIO()
image.save(io, format)
client.put_object(
ACL='public-read',
Body=io.getvalue(),
Bucket=bucket,
Key=key,
ContentType=Image.MIME[format],
)
if __name__ == "__main__":
if sys.argv[1] == '--s3':
handle_s3({
"Records": [
{
"s3": {
"bucket": {"name": "data.electionleaflets.org"},
"object": {"key": "raw_leaflets/leaflets/0439E661-EA19-482F-B11A-454038DE148A.jpeg"}
}
}
]
}, {})
elif sys.argv[1] == '--cf':
r = handle_cf({
"Records": [
{
"cf": {
"request": {"uri": "/thumbs/300/crop=top/leaflets/0439E661-EA19-482F-B11A-454038DE148A.jpeg"},
"response": {"status": "404", "headers": {}}
}
}
]
}, {})
print(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment