Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save AntonioFeijaoUK/c1c9649d8ce49927aa0a4454262a607f to your computer and use it in GitHub Desktop.
Save AntonioFeijaoUK/c1c9649d8ce49927aa0a4454262a607f to your computer and use it in GitHub Desktop.
function-to-email-notification-for-new-file-uploaded.py
print('Loading function')
# source - https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-presigned-urls.html
import logging
import boto3
from botocore.exceptions import ClientError
import json
import os
sns_topic_arn = os.environ['sns_topic_arn']
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
def create_presigned_url(bucket_name, object_name, expiration=3600):
"""Generate a presigned URL to share an S3 object
:param bucket_name: string
:param object_name: string
:param expiration: Time in seconds for the presigned URL to remain valid
:return: Presigned URL as string. If error, returns None.
"""
# Generate a presigned URL for the S3 object
#s3_client = boto3.client('s3')
try:
response = s3_client.generate_presigned_url('get_object',
Params={'Bucket': bucket_name,
'Key': object_name},
ExpiresIn=expiration)
except ClientError as e:
logging.error(e)
return None
# The response contains the presigned URL
return response
# useful links
# https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/s3/s3_basics/presigned_url.py
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-presigned-urls.html
# https://docs.aws.amazon.com/lambda/latest/dg/with-s3-tutorial.html
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
#print('This is sns_topic_arn: {}.'.format(sns_topic_arn))
# Get the object from the event and show its content type
bucket_name = event['Records'][0]['s3']['bucket']['name']
#key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
object_name = event['Records'][0]['s3']['object']['key']
try:
#response = s3_client.get_object(Bucket=bucket, Key=key)
#print("CONTENT TYPE: " + response['ContentType'])
print('Object name `{}` from bucket `{}`.'.format(object_name, bucket_name))
#url = s3_client.generate_presigned_url(
# ClientMethod='get_object',
# Params={'Bucket': bucket, 'Key': key},
# ExpiresIn='25200' #7 days
#)
#print('This is the result URL - {} '.format(url))
#presigned_url_for_email = '<a href="{}">Your private link for download</a>'.format(url)
#print(presigned_url_for_email)
url = create_presigned_url(bucket_name, object_name)
print(url)
response = sns_client.publish(
TopicArn=sns_topic_arn,
Message='Notification for bucket {}. \nA new file name {} was added. \n\nHere is the link for download: \n\n {} \n\nPlease let us know if you need a new link as for security they expire after 1 hour. \nThank you. \nGEL fantastic staff.'.format(bucket_name, object_name, url),
Subject='Bucket notification - new file {}'.format(object_name)
)
return 0
#response['ContentType']
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment