Created
July 23, 2020 04:08
-
-
Save mig5/e3ff9c6ab08be0a2b342f22b77899563 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2016 Amazon Web Services, Inc. | |
from __future__ import print_function | |
import boto3 | |
import json | |
import datetime | |
sns = boto3.client('sns') | |
inspector = boto3.client('inspector') | |
# SNS topic - will be created if it does not already exist | |
SNS_TOPIC = "Inspector-Finding-Delivery" | |
# Destination email - will be subscribed to the SNS topic if not already | |
DEST_EMAIL_ADDR = "you@example.com" | |
# quick function to handle datetime serialization problems | |
enco = lambda obj: ( | |
obj.isoformat() | |
if isinstance(obj, datetime.datetime) | |
or isinstance(obj, datetime.date) | |
else None | |
) | |
def lambda_handler(event, context): | |
# extract the message that Inspector sent via SNS | |
message = event['Records'][0]['Sns']['Message'] | |
# get inspector notification type | |
notificationType = json.loads(message)['event'] | |
# skip everything except report_finding notifications | |
if notificationType != "FINDING_REPORTED": | |
print('Skipping notification that is not a new finding: ' + notificationType) | |
return 1 | |
# extract finding ARN | |
findingArn = json.loads(message)['finding'] | |
# get finding and extract detail | |
response = inspector.describe_findings(findingArns = [ findingArn ], locale='EN_US') | |
finding = response['findings'][0] | |
# skip uninteresting findings | |
title = finding['title'] | |
if title == "Unsupported Operating System or Version": | |
print('Skipping finding: ', title) | |
return 1 | |
if title == "No potential security issues found": | |
print('Skipping finding: ', title) | |
return 1 | |
# Get the server name from tag | |
server_name = "" | |
for tags in finding['assetAttributes']['tags']: | |
if tags['key'] == 'Name': | |
server_name = tags['value'] | |
# If the server is expected to run certain public ports, ignore these from the findings | |
if server_name in ['webapps (Prod)', 'vpn.foobar.com', 'brochure-pet-server.foobar.com', 'public-staging-site-pw-protected.foobar.com']: | |
tcp_allowed = [ '80', '443', '[[80 - 80], [443 - 443]]' ] | |
udp_allowed = [ '1194', '[[1194 - 1194]]' ] | |
ports = [] | |
for tags in finding['attributes']: | |
if tags['key'] == 'TCP_PORTS' or tags['key'] == 'UDP_PORTS' or tags['key'] == 'PORT': | |
if tags['value'] != "[]": | |
ports.append(tags['value']) | |
for port in ports: | |
if port in tcp_allowed: | |
print('Skipping finding: ', title) | |
ports.remove(port) | |
for port in ports: | |
if port in udp_allowed and server_name == 'vpn.foobar.com': | |
print('Skipping finding: ', title) | |
ports.remove(port) | |
if len(ports) == 0: | |
print('No findings worthy of reporting') | |
return 1 | |
# get the information to send via email | |
subject = title[:100] # truncate @ 100 chars, SNS subject limit | |
messageBody = "Title:\n" + title + "\n\nServer: " + server_name + "\n\nDescription:\n" + finding['description'] + "\n\nRecommendation:\n" + finding['recommendation'] | |
# un-comment the following line to dump the entire finding as raw json | |
#messageBody = json.dumps(finding, default=enco, indent=2) | |
# create SNS topic if necessary | |
response = sns.create_topic(Name = SNS_TOPIC) | |
snsTopicArn = response['TopicArn'] | |
# check to see if the subscription already exists | |
subscribed = False | |
response = sns.list_subscriptions_by_topic( TopicArn = snsTopicArn ) | |
nextPageToken = "" | |
# iterate through subscriptions array in paginated list API call | |
while True: | |
response = sns.list_subscriptions_by_topic( | |
TopicArn = snsTopicArn, | |
NextToken = nextPageToken | |
) | |
for subscription in response['Subscriptions']: | |
if ( subscription['Endpoint'] == DEST_EMAIL_ADDR ): | |
subscribed = True | |
break | |
if 'NextToken' not in response: | |
break | |
else: | |
nextPageToken = response['NextToken'] | |
# create subscription if necessary | |
if ( subscribed == False ): | |
response = sns.subscribe( | |
TopicArn = snsTopicArn, | |
Protocol = 'email', | |
Endpoint = DEST_EMAIL_ADDR | |
) | |
# publish notification to topic | |
response = sns.publish( | |
TopicArn = snsTopicArn, | |
Message = messageBody, | |
Subject = subject | |
) | |
return 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment