Skip to content

Instantly share code, notes, and snippets.

@mig5
Created July 23, 2020 04:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mig5/e3ff9c6ab08be0a2b342f22b77899563 to your computer and use it in GitHub Desktop.
Save mig5/e3ff9c6ab08be0a2b342f22b77899563 to your computer and use it in GitHub Desktop.
# Copyright (c) 2016 Amazon Web Services, Inc.
from __future__ import print_function
import boto3
import json
import datetime
sns = boto3.client('sns')
inspector = boto3.client('inspector')
# SNS topic - will be created if it does not already exist
SNS_TOPIC = "Inspector-Finding-Delivery"
# Destination email - will be subscribed to the SNS topic if not already
DEST_EMAIL_ADDR = "you@example.com"
# quick function to handle datetime serialization problems
enco = lambda obj: (
obj.isoformat()
if isinstance(obj, datetime.datetime)
or isinstance(obj, datetime.date)
else None
)
def lambda_handler(event, context):
# extract the message that Inspector sent via SNS
message = event['Records'][0]['Sns']['Message']
# get inspector notification type
notificationType = json.loads(message)['event']
# skip everything except report_finding notifications
if notificationType != "FINDING_REPORTED":
print('Skipping notification that is not a new finding: ' + notificationType)
return 1
# extract finding ARN
findingArn = json.loads(message)['finding']
# get finding and extract detail
response = inspector.describe_findings(findingArns = [ findingArn ], locale='EN_US')
finding = response['findings'][0]
# skip uninteresting findings
title = finding['title']
if title == "Unsupported Operating System or Version":
print('Skipping finding: ', title)
return 1
if title == "No potential security issues found":
print('Skipping finding: ', title)
return 1
# Get the server name from tag
server_name = ""
for tags in finding['assetAttributes']['tags']:
if tags['key'] == 'Name':
server_name = tags['value']
# If the server is expected to run certain public ports, ignore these from the findings
if server_name in ['webapps (Prod)', 'vpn.foobar.com', 'brochure-pet-server.foobar.com', 'public-staging-site-pw-protected.foobar.com']:
tcp_allowed = [ '80', '443', '[[80 - 80], [443 - 443]]' ]
udp_allowed = [ '1194', '[[1194 - 1194]]' ]
ports = []
for tags in finding['attributes']:
if tags['key'] == 'TCP_PORTS' or tags['key'] == 'UDP_PORTS' or tags['key'] == 'PORT':
if tags['value'] != "[]":
ports.append(tags['value'])
for port in ports:
if port in tcp_allowed:
print('Skipping finding: ', title)
ports.remove(port)
for port in ports:
if port in udp_allowed and server_name == 'vpn.foobar.com':
print('Skipping finding: ', title)
ports.remove(port)
if len(ports) == 0:
print('No findings worthy of reporting')
return 1
# get the information to send via email
subject = title[:100] # truncate @ 100 chars, SNS subject limit
messageBody = "Title:\n" + title + "\n\nServer: " + server_name + "\n\nDescription:\n" + finding['description'] + "\n\nRecommendation:\n" + finding['recommendation']
# un-comment the following line to dump the entire finding as raw json
#messageBody = json.dumps(finding, default=enco, indent=2)
# create SNS topic if necessary
response = sns.create_topic(Name = SNS_TOPIC)
snsTopicArn = response['TopicArn']
# check to see if the subscription already exists
subscribed = False
response = sns.list_subscriptions_by_topic( TopicArn = snsTopicArn )
nextPageToken = ""
# iterate through subscriptions array in paginated list API call
while True:
response = sns.list_subscriptions_by_topic(
TopicArn = snsTopicArn,
NextToken = nextPageToken
)
for subscription in response['Subscriptions']:
if ( subscription['Endpoint'] == DEST_EMAIL_ADDR ):
subscribed = True
break
if 'NextToken' not in response:
break
else:
nextPageToken = response['NextToken']
# create subscription if necessary
if ( subscribed == False ):
response = sns.subscribe(
TopicArn = snsTopicArn,
Protocol = 'email',
Endpoint = DEST_EMAIL_ADDR
)
# publish notification to topic
response = sns.publish(
TopicArn = snsTopicArn,
Message = messageBody,
Subject = subject
)
return 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment