Created
April 7, 2020 22:07
-
-
Save jcaxmacher/d5817899b4b100269ad1d7a44f146918 to your computer and use it in GitHub Desktop.
Security Hub Config Custom Resource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import time | |
import boto3 | |
securityhub = boto3.client('securityhub') | |
def configure(mas): | |
if settings.get('MasterAccountId'): | |
accept_master_invitation(settings['MasterAccountId']) | |
if settings.get('EnabledStandards'): | |
enable_standards(settings['EnabledStandards']) | |
if settings.get('DisabledStandardsControls'): | |
disable_standards_controls(settings['DisabledStandardsControls']) | |
def accept_master_invitation(master_account_id): | |
response = securityhub.list_invitations() | |
for invitation in response.get('Invitations', []): | |
if invitation['AccountId'] == master_account_id: | |
securityhub.accept_invitation( | |
MasterId=master_account_id, | |
InvitationId=invitation['InvitationId'] | |
) | |
break | |
else: | |
raise Exception( | |
'No invitation present from account ID {0}'.format(master_account_id) | |
) | |
def enable_standards(standards): | |
standards_subscription_requests = [ | |
{'StandardsArn': standard} | |
for standard in standards | |
] | |
securityhub.batch_enable_standards( | |
StandardsSubscriptionRequests=standards_subscription_requests | |
) | |
def disable_standards(standards): | |
enabled_standards = { | |
s['StandardsArn']: s['StandardsSubscriptionArn'] | |
for s in securityhub.get_enabled_standards().\ | |
get('StandardsSubscriptions', []) | |
} | |
standards_subscription_arns = [ | |
enable_standards[standard] | |
for standard in standards | |
] | |
securityhub.batch_enable_standards( | |
StandardsSubscriptionArns=standards_subscription_arns | |
) | |
def wait_for_standard_readiness(standards): | |
# Check for successful standards enablement | |
# five times using retry backoff with random jitter | |
time.sleep(random.random()) | |
for i in range(5): | |
time.sleep((i * 2.0) + random.random()) | |
response = securityhub.get_enabled_standards() | |
standards_status = { | |
s['StandardsArn']: s['StandardsStatus'] | |
for s in response.get('StandardsSubscriptions', []) | |
} | |
if all([standards_status.get(s) == 'READY' for s in standards]): | |
print('All standards enabled and ready') | |
break | |
else: | |
raise Exception( | |
'Timed out waiting for standards to reach READY state' | |
) | |
def enable_standards(standard_names): | |
# Get the available standards | |
standards = { | |
s['Name']: s['StandardsArn'] | |
for s in securityhub.describe_standards().get('Standards', []) | |
} | |
# Determine which standards are enabled | |
enabled_standards = set([ | |
s['StandardsArn'] | |
for s in securityhub.get_enabled_standards().\ | |
get('StandardsSubscriptions', []) | |
]) | |
# Convert standard names into set of desired standard arns | |
# This will fail with KeyError if the standard name given is | |
# not found in the available standards (desired behaviour) | |
desired_standards = set([ | |
standards[s] | |
for s in standard_names | |
]) | |
# Set-wise calculation of which standards are missing / extraneous | |
missing_standards = describe_standards - enabled_standards | |
extraneous_standards = enable_standards - desired_standards | |
# Adjust enabled standards | |
enable_standards(missing_standards) | |
disable_standards(extraneous_standards) | |
# Wait for standards to take effect | |
wait_for_standard_readiness(desired_standards) | |
def get_control_arns(): | |
controls = {} | |
for s in securityhub.get_enabled_standards().\ | |
get('StandardsSubscriptions', []): | |
subscription_arn = s['StandardsSubscriptionArn'] | |
response = securityhub.describe_standards_controls( | |
StandardsSubscriptionArn=subscription_arn | |
) | |
for control in response.get('Controls', []): | |
controls[control['ControlId']] = control['StandardsControlArn'] | |
return controls | |
def disable_standards_controls(controls): | |
enabled_controls = get_control_arns() | |
for control in controls: | |
if control.id not in enabled_controls: | |
raise Exception( | |
'Control {0!r} not found in enabled Security Hub controls'.format( | |
control | |
) | |
) | |
else: | |
control_arn = enabled_controls[control.id] | |
response = securityhub.update_standards_control( | |
StandardsControlArn=control_arn, | |
ControlStatus='DISABLED', | |
DisabledReason=control.reason | |
) | |
if response.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200: | |
raise Exception( | |
'Error occurred disabling control {0!r}'.format( | |
control | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment