React to an Observable alert from AWS Lambda
from __future__ import print_function | |
import json | |
# We can avoid needing to package requests by using botocore's included version | |
import boto3 | |
from botocore.vendored import requests | |
# Replace these items with the values from the appropriate Observable web portal | |
CUSTOMER = 'example' | |
API_USER = 'user@example.com' | |
API_KEY = 'API_KEY_GOES_HERE' | |
# Other alert types could be added here | |
ALERT_TYPES = {'Excessive Access Attempts (External)'} | |
# Prepare the headers for requests to the Observable API | |
OBSERVATIONS_ENDPOINT = ( | |
'https://{customer}.obsrvbl.com/api/v3/observations/all/' | |
) | |
REQUEST_HEADERS = { | |
'Authorization': 'ApiKey {}:{}'.format(API_USER, API_KEY), | |
'Accept': 'application/json' | |
} | |
# Retrieves observation details for a particular alert | |
def get_obs_data(alert_id): | |
response = requests.get( | |
url=OBSERVATIONS_ENDPOINT.format(customer=CUSTOMER), | |
headers=REQUEST_HEADERS, | |
params={'alert': alert_id}, | |
) | |
return response.json()['objects'] | |
def lambda_handler(event, context): | |
ec2_client = boto3.client('ec2') | |
alert_json = event['Records'][0]['Sns']['Message'] | |
alert_data = json.loads(alert_json) | |
# React to known alert types only | |
alert_type = alert_data['type'] | |
print('Alert type is {}'.format(alert_data['type'])) | |
if alert_type not in ALERT_TYPES: | |
print('Unrecognized alert type, exiting') | |
return | |
# Extract remote IPs from observations | |
alert_id = int(alert_data['id']) | |
remote_ips = set() | |
for obs_data in get_obs_data(alert_id): | |
if 'remote_ip' not in obs_data: | |
continue | |
remote_ips.add(obs_data['remote_ip']) | |
print('Found remote ips: {}'.format(remote_ips)) | |
# Look up the source's VPC | |
source_name = alert_data['source_name'] | |
instance_response = ec2_client.describe_instances( | |
Filters=[{'Name': 'instance-id', 'Values': [source_name]}] | |
) | |
try: | |
vpc_id = instance_response['Reservations'][0]['Instances'][0]['VpcId'] | |
except (IndexError, KeyError): | |
print('Could not find instance {}'.format(source_name)) | |
return | |
# Find the VPC's ACL | |
acl_response = ec2_client.describe_network_acls( | |
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}] | |
) | |
try: | |
acl_id = acl_response['NetworkAcls'][0]['NetworkAclId'] | |
except (IndexError, KeyError): | |
print('Could not find ACL info for {}'.format(vpc_id)) | |
entries = acl_response['NetworkAcls'][0]['Entries'] | |
min_rule = min(e['RuleNumber'] for e in entries) | |
print(repr(entries)) | |
already_blocked = {e['CidrBlock'] for e in entries if 'CidrBlock' in e} | |
cidrs_to_block = {'{}/32'.format(x) for x in remote_ips} | |
cidrs_to_block -= already_blocked | |
# Create new rules | |
for i, cidr_block in enumerate(cidrs_to_block, 1): | |
rule_number = min_rule - i | |
if rule_number <= 0: | |
print('Out of space for rules') | |
break | |
print('Creating: {}. DENY {}'.format(rule_number, cidr_block)) | |
create_response = ec2_client.create_network_acl_entry( | |
DryRun=True, # Comment out DryRun to make this happen for real | |
NetworkAclId=acl_id, | |
RuleNumber=rule_number, | |
Protocol='-1', | |
RuleAction='deny', | |
Egress=False, | |
CidrBlock=cidr_block, | |
) | |
print(create_response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment