Skip to content

Instantly share code, notes, and snippets.

@jcaxmacher
Created April 7, 2020 22:07
Show Gist options
  • Save jcaxmacher/d5817899b4b100269ad1d7a44f146918 to your computer and use it in GitHub Desktop.
Save jcaxmacher/d5817899b4b100269ad1d7a44f146918 to your computer and use it in GitHub Desktop.
Security Hub Config Custom Resource
import random
import time
import boto3
securityhub = boto3.client('securityhub')
def configure(mas):
if settings.get('MasterAccountId'):
accept_master_invitation(settings['MasterAccountId'])
if settings.get('EnabledStandards'):
enable_standards(settings['EnabledStandards'])
if settings.get('DisabledStandardsControls'):
disable_standards_controls(settings['DisabledStandardsControls'])
def accept_master_invitation(master_account_id):
response = securityhub.list_invitations()
for invitation in response.get('Invitations', []):
if invitation['AccountId'] == master_account_id:
securityhub.accept_invitation(
MasterId=master_account_id,
InvitationId=invitation['InvitationId']
)
break
else:
raise Exception(
'No invitation present from account ID {0}'.format(master_account_id)
)
def enable_standards(standards):
standards_subscription_requests = [
{'StandardsArn': standard}
for standard in standards
]
securityhub.batch_enable_standards(
StandardsSubscriptionRequests=standards_subscription_requests
)
def disable_standards(standards):
enabled_standards = {
s['StandardsArn']: s['StandardsSubscriptionArn']
for s in securityhub.get_enabled_standards().\
get('StandardsSubscriptions', [])
}
standards_subscription_arns = [
enable_standards[standard]
for standard in standards
]
securityhub.batch_enable_standards(
StandardsSubscriptionArns=standards_subscription_arns
)
def wait_for_standard_readiness(standards):
# Check for successful standards enablement
# five times using retry backoff with random jitter
time.sleep(random.random())
for i in range(5):
time.sleep((i * 2.0) + random.random())
response = securityhub.get_enabled_standards()
standards_status = {
s['StandardsArn']: s['StandardsStatus']
for s in response.get('StandardsSubscriptions', [])
}
if all([standards_status.get(s) == 'READY' for s in standards]):
print('All standards enabled and ready')
break
else:
raise Exception(
'Timed out waiting for standards to reach READY state'
)
def enable_standards(standard_names):
# Get the available standards
standards = {
s['Name']: s['StandardsArn']
for s in securityhub.describe_standards().get('Standards', [])
}
# Determine which standards are enabled
enabled_standards = set([
s['StandardsArn']
for s in securityhub.get_enabled_standards().\
get('StandardsSubscriptions', [])
])
# Convert standard names into set of desired standard arns
# This will fail with KeyError if the standard name given is
# not found in the available standards (desired behaviour)
desired_standards = set([
standards[s]
for s in standard_names
])
# Set-wise calculation of which standards are missing / extraneous
missing_standards = describe_standards - enabled_standards
extraneous_standards = enable_standards - desired_standards
# Adjust enabled standards
enable_standards(missing_standards)
disable_standards(extraneous_standards)
# Wait for standards to take effect
wait_for_standard_readiness(desired_standards)
def get_control_arns():
controls = {}
for s in securityhub.get_enabled_standards().\
get('StandardsSubscriptions', []):
subscription_arn = s['StandardsSubscriptionArn']
response = securityhub.describe_standards_controls(
StandardsSubscriptionArn=subscription_arn
)
for control in response.get('Controls', []):
controls[control['ControlId']] = control['StandardsControlArn']
return controls
def disable_standards_controls(controls):
enabled_controls = get_control_arns()
for control in controls:
if control.id not in enabled_controls:
raise Exception(
'Control {0!r} not found in enabled Security Hub controls'.format(
control
)
)
else:
control_arn = enabled_controls[control.id]
response = securityhub.update_standards_control(
StandardsControlArn=control_arn,
ControlStatus='DISABLED',
DisabledReason=control.reason
)
if response.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
raise Exception(
'Error occurred disabling control {0!r}'.format(
control
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment