Skip to content

Instantly share code, notes, and snippets.

@Burekasim
Created October 7, 2020 15:01
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Burekasim/46aa5bc696160c4d33013ab1d0ae9784 to your computer and use it in GitHub Desktop.
Save Burekasim/46aa5bc696160c4d33013ab1d0ae9784 to your computer and use it in GitHub Desktop.
access s3 objects with ALB using lambda
import os
import json
import boto3
from botocore.exceptions import ClientError
def lambda_handler(event, context):
# use 'AWS_REGION' environment variable from lambda built-in variables
aws_region = os.environ['AWS_REGION']
bucket_name = os.environ['BUCKET_NAME']
path = event['path'].split('/')
file_path = '/'.join(path[2:])
output = {
"isBase64Encoded": False,
"statusCode": 200,
"statusDescription": "200 OK",
"headers": {
"Content-Type": 'text/html'
},
}
if event['headers']['user-agent'] == 'ELB-HealthChecker/2.0':
output['body'] = 'Response to HealthCheck'
return output
s3 = boto3.resource('s3', region_name=aws_region)
obj = s3.Object(bucket_name, file_path)
try:
# Lambda output is limited to 1Mb when invoked from ALB.
object_size = obj.get()['ContentLength']
object_size_in_mb = '{:.2f}'.format(object_size / 1024 / 1024)
if obj.get()['ContentLength'] >= 1000000:
output['body'] = json.dumps({'status': 'error', 'message': f'object size is {object_size_in_mb}Mb, lambda with alb is limited to 1Mb'}, indent=4)
return output
metadata = obj.get()['ResponseMetadata']['HTTPHeaders']
# Extract s3 mimetype (http headers)
output['headers']['Content_Language'] = metadata.get('content-language')
output['headers']['Cache_Control'] = metadata.get('cache-control')
output['headers']['Content_Disposition'] = metadata.get('content-disposition')
output['headers']['Content_Encoding'] = metadata.get('content-encoding')
output['headers']['Content_Language'] = metadata.get('content-language')
output['headers']['Content-Type'] = metadata['content-type']
# Remove empty headers to avoid 502 errors from ALB.
output['headers'] = {k: v for k, v in output['headers'].items() if v is not None}
output['body'] = obj.get()['Body'].read().encode('latin-1')
except ClientError as e:
output['body'] = json.dumps({'status': 'error', 'message': [str(e)]}, indent=4)
except Exception as e:
output['body'] = json.dumps({'status': 'unknown exception', 'message': [str(e)]}, indent=4)
return output
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment