Skip to content

Instantly share code, notes, and snippets.

@unacceptable
Last active September 30, 2020 23:58
Show Gist options
  • Save unacceptable/452d184ccacc42398d5d3ff2328cf8a2 to your computer and use it in GitHub Desktop.
Save unacceptable/452d184ccacc42398d5d3ff2328cf8a2 to your computer and use it in GitHub Desktop.
Replay s3 event triggers.
#!/usr/bin/env python3
# Written by: Robert J.
# Email: robert@aztek.io
import os
import sys
import logging
import json
import boto3
from datetime import date, datetime
import re
#######################################
### Logging Settings ##################
#######################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)
#######################################
### Global Variables ##################
#######################################
S3 = boto3.client('s3')
LAMBDA = boto3.client('lambda')
FUNCTION_NAME = os.environ.get('FUNCTION_NAME', 'my_function')
KEY_REGEX = os.environ.get('KEY_REGEX', None)
DRY_RUN = bool(os.environ.get('DRY_RUN', False))
#######################################
### Main Function #####################
#######################################
def main(event=None, context=None):
logger.info('Starting Script')
error_count = 0
logger.info('DRY_RUN: {}'.format(DRY_RUN))
if DRY_RUN:
invocation_type='DryRun'
else:
invocation_type='Event'
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
keys = get_keys(bucket)
mock_events = create_mock_events(bucket, keys)
for mock_event in mock_events:
error_count += parse_mock_event(mock_event, invocation_type=invocation_type)
error_logic(error_count)
#######################################
### Generic Functions #################
#######################################
def fatal(message, code=1):
logger.critical(message)
logger.info('Exiting Application')
sys.exit(code)
def configure_logging(stdout=False, filename=None, log_format=None):
if not log_format:
log_format="%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
if stdout:
s_handler = logging.StreamHandler(sys.stdout)
s_handler.setFormatter(formatter)
logger.addHandler(s_handler)
logger.debug('Configured logging to stdout.')
if filename:
try:
f_handler = logging.FileHandler(filename)
except PermissionError as e:
fatal(e)
f_handler.setFormatter(formatter)
logger.addHandler(f_handler)
logger.debug('Configured logging for {}'.format(filename))
def custom_json_parser(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
raise TypeError ('Type "{}" not serializable'.format(type(value)))
def error_logic(error_count):
logger.info('Total Errors: {}'.format(error_count))
if error_count > 255:
error_count = 255
logger.info('Exiting Script')
# exit if error_count (lambda doesn't like sys.exit(0))
if error_count > 0:
sys.exit(error_count)
#######################################
### Program Specific Functions ########
#######################################
def get_keys(bucket):
all_keys = []
response = {
'NextContinuationToken': 'asdf'
}
while 'NextContinuationToken' in response.keys():
if response['NextContinuationToken'] != 'asdf':
logger.info('Continuation token found.')
response = S3.list_objects_v2(
Bucket=bucket,
ContinuationToken=response['NextContinuationToken']
)
else:
logger.info('Retrieving list of objects from {}.'.format(bucket))
response = S3.list_objects_v2(
Bucket=bucket
)
keys = [
r['Key'] for r in response["Contents"]
]
if KEY_REGEX:
keys = [
key for key in keys if re.search(KEY_REGEX, key)
]
all_keys += keys
logger.debug('S3 Objects: {}'.format(all_keys))
return all_keys
def create_mock_events(bucket, keys):
mock_events = [
{
"Records": [
{
"s3": {
"bucket": {
"name": bucket
},
"object": {
"key": key
}
}
}
]
} for key in keys
]
logger.debug('Mock events: {}'.format(json.dumps(mock_events, indent=5)))
return mock_events
def parse_mock_event(mock_event, invocation_type='DryRun'):
key = mock_event['Records'][0]['s3']['object']['key']
payload = bytes(
json.dumps(mock_event),
encoding='utf-8'
)
logger.info('Parsing {}'.format(key))
logger.debug('Invoking {}'.format(FUNCTION_NAME))
logger.debug('Payload {}'.format(payload))
try:
LAMBDA.invoke(
FunctionName=FUNCTION_NAME,
InvocationType=invocation_type,
Payload=payload
)
except KeyboardInterrupt as e:
print('\n')
fatal('Received Keyboard Interrupt {}'.format(e))
except LAMBDA.exceptions.ResourceNotFoundException as e:
fatal(e)
except:
logger.error('Unable to invoke lambda for payload {}.'.format(key))
return 1
return 0
#######################################
### Execution #########################
#######################################
if __name__ == '__main__':
configure_logging(stdout=True)
event = {
"Records": [
{
"s3": {
"bucket": {
"name": "first-bucket-to-process",
},
"object": {
"key": "",
}
}
},
{
"s3": {
"bucket": {
"name": "second-bucket-to-process",
},
"object": {
"key": "",
}
}
}
]
}
try:
main(event=event)
except KeyboardInterrupt as e:
fatal('Evaluating Keyboard interrupt: {}'.format(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment