Skip to content

Instantly share code, notes, and snippets.

@davidrosenstark
Last active June 2, 2020 12:22
Show Gist options
  • Save davidrosenstark/3d1e87638556f043c3d0dbeb1e21f690 to your computer and use it in GitHub Desktop.
Save davidrosenstark/3d1e87638556f043c3d0dbeb1e21f690 to your computer and use it in GitHub Desktop.
error_log_retriever
import json
import logging
import os
from datetime import timedelta
import boto3
from dateutil.parser import parse
environment = os.environ.get("ENV")
logger = logging.getLogger(__name__)
client = boto3.client('logs')
sns_client = boto3.client('sns')
#How many events to send to notification so the message is not too large
MAX_NUM_EVENTS = 3
sns_arn = os.environ.get("NOTIFY_ARN")
def get_log_group_name(message):
if message['Trigger']['Namespace'] == 'AWS/Lambda':
# genericize for dimension FunctionName
func_name = list(filter(lambda x: x['name'] == 'FunctionName', message['Trigger']['Dimensions']))[0]['value']
return "/aws/lambda/" + func_name
else:
raise Exception("not an alarm related to log filters")
def send_filter_log_events_request(log_group_name, alarm_time, nextToken=None):
args = {'logGroupName': log_group_name, 'endTime': int((alarm_time.timestamp()) * 1000),
'startTime': int((alarm_time - timedelta(minutes=10)).timestamp() * 1000),
'filterPattern': "?ERROR ?\"Task timed out\"",
'interleaved': True
}
if nextToken is not None:
args['nextToken'] = nextToken
return client.filter_log_events(**args)
def get_logs(log_group_name, alarm_time):
response = send_filter_log_events_request(log_group_name, alarm_time)
# remove garbage from the logs
events = [e['message'].replace("\u00a0"," ") for e in response['events']]
while 'nextToken' in response:
response = send_filter_log_events_request(log_group_name, alarm_time, response['nextToken'])
events += [e['message'].replace("\u00a0"," ") for e in response['events']]
return events
def lambda_handler(event, _):
logger.debug(f"incoming message: {event['Records']}")
for record in event['Records']:
try:
sns_message = record['Sns']
alarm_time = parse(sns_message['Timestamp'])
message = json.loads(sns_message['Message'])
log_group_name = get_log_group_name(message)
logs = get_logs(log_group_name, alarm_time)
if len(logs) > 0:
alert_message = {'LambdaWithErrors': log_group_name[log_group_name.rfind("/")+1:],
'num_errors': len(logs),
'error_logs': logs[:5]}
logger.debug(f"sending alert message: {alert_message}")
sns_client.publish(TopicArn=sns_arn, Message=json.dumps(alert_message))
else:
logger.warning("Failed to retrieve log records for error")
except Exception as ex:
logger.info(f"exception: {ex}")
logger.info(f"not processing: {record}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment