Created
October 31, 2021 05:24
-
-
Save MasahiroKawahara/d3e4e0c535be50efc8ec2c5d9fb3731a to your computer and use it in GitHub Desktop.
AWS Security Hub の『基礎セキュリティのベストプラクティス』特定コントロールを有効化(無効化) する CFnテンプレート
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AWSTemplateFormatVersion: '2010-09-09' | |
Parameters: | |
AFSBPEnableList: | |
Type: CommaDelimitedList | |
Default: "" | |
AFSBPDisableList: | |
Type: CommaDelimitedList | |
Default: "" | |
RoleNamePrefix: | |
Type: String | |
Default: InitialSetupRole-UpdateSecurityHubControls | |
Resources: | |
UpdateControls: | |
Type: Custom::UpdateControls | |
Properties: | |
ServiceToken: !GetAtt "UpdateControlsLambdaFunction.Arn" | |
AFSBPEnableList: !Ref AFSBPEnableList | |
AFSBPDisableList: !Ref AFSBPDisableList | |
Region: !Ref "AWS::Region" | |
UpdateControlsLambdaFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Role: !GetAtt "LambdaExecutionRole.Arn" | |
Runtime: "python3.8" | |
Handler: index.lambda_handler | |
Timeout: 600 | |
Code: | |
ZipFile: | | |
import sys | |
import boto3 | |
import cfnresponse | |
from logging import getLogger, INFO, log | |
logger = getLogger() | |
logger.setLevel(INFO) | |
def parse_event(event): | |
region = event['ResourceProperties']['Region'] | |
# 基礎セキュリティのベストプラクティス(AFSBP) | |
afsbp_future_enabled_ids = event['ResourceProperties']['AFSBPEnableList'] | |
afsbp_future_disabled_ids = event['ResourceProperties']['AFSBPDisableList'] | |
# Enableリストに無いコントロールIDのみ Disable の対象とする | |
afsbp_future_disabled_ids = [ | |
i for i in afsbp_future_disabled_ids if i not in afsbp_future_enabled_ids] | |
return { | |
"AFSBPFutureEnabled": afsbp_future_enabled_ids, | |
"AFSBPFutureDisabled": afsbp_future_disabled_ids, | |
"Region": region | |
} | |
def get_afsbp_current_controls(securityhub, region, account_id): | |
ARN_FORMAT = "arn:aws:securityhub:{region}:{account_id}:subscription/aws-foundational-security-best-practices/v/1.0.0" | |
subscription_arn = ARN_FORMAT.format(region=region, account_id=account_id) | |
controls = [] | |
# コントロール一覧の取得 | |
response = securityhub.describe_standards_controls( | |
StandardsSubscriptionArn=subscription_arn) | |
controls = controls + response['Controls'] | |
while 'NextToken' in response: | |
response = securityhub.describe_standards_controls( | |
StandardsSubscriptionArn=subscription_arn, | |
NextToken=response['NextToken']) | |
controls = controls + response['Controls'] | |
return controls | |
def disable_afsbp_controls(securityhub, region, account_id, future_disabled_ids, current_enabled_ids): | |
ARN_FORMAT = "arn:aws:securityhub:{region}:{account_id}:control/aws-foundational-security-best-practices/v/1.0.0/{control_id}" | |
for control_id in future_disabled_ids: | |
# 現状 Enabled でない コントロールはスキップ | |
if control_id not in current_enabled_ids: | |
logger.info( | |
" {} is already disabled, or not exist".format(control_id)) | |
continue | |
# コントロールの無効化 | |
logger.info(" disabling {}".format(control_id)) | |
control_arn = ARN_FORMAT.format( | |
region=region, account_id=account_id, control_id=control_id) | |
response = securityhub.update_standards_control( | |
StandardsControlArn=control_arn, | |
ControlStatus='DISABLED', | |
DisabledReason="Disabled by automated setup") | |
if response['ResponseMetadata']['HTTPStatusCode'] == 200: | |
logger.info(" disable successfully") | |
else: | |
logger.info(" disable failed") | |
def enable_afsbp_controls(securityhub, region, account_id, future_enabled_ids, current_disabled_ids): | |
ARN_FORMAT = "arn:aws:securityhub:{region}:{account_id}:control/aws-foundational-security-best-practices/v/1.0.0/{control_id}" | |
for control_id in future_enabled_ids: | |
# 現状 Disabled でない コントロールはスキップ | |
if control_id not in current_disabled_ids: | |
logger.info( | |
" {} is already enabled, or not exist".format(control_id)) | |
continue | |
# コントロールの有効化 | |
logger.info(" enabling {}".format(control_id)) | |
control_arn = ARN_FORMAT.format( | |
region=region, account_id=account_id, control_id=control_id) | |
response = securityhub.update_standards_control( | |
StandardsControlArn=control_arn, | |
ControlStatus='ENABLED') | |
if response['ResponseMetadata']['HTTPStatusCode'] == 200: | |
logger.info(" enable successfully") | |
else: | |
logger.info(" enable failed") | |
def update_securityhub_controls(event, context): | |
logger.info('[START] update_securityhub_controls') | |
try: | |
input = parse_event(event) | |
afsbp_future_enabled_ids = input['AFSBPFutureEnabled'] | |
afsbp_future_disabled_ids = input['AFSBPFutureDisabled'] | |
region = input['Region'] | |
logger.info("afsbp_future_enabled_ids={}".format( | |
afsbp_future_enabled_ids)) | |
logger.info("afsbp_future_disabled_ids={}".format( | |
afsbp_future_disabled_ids)) | |
logger.info("region={}".format(region)) | |
account_id = boto3.client('sts').get_caller_identity()['Account'] | |
securityhub = boto3.client('securityhub', region_name=region) | |
# 現状のコントロール情報の取得 | |
controls = get_afsbp_current_controls(securityhub, region, account_id) | |
afsbp_current_enabled_ids = [c['ControlId'] | |
for c in controls if c['ControlStatus'] == 'ENABLED'] | |
afsbp_current_disabled_ids = [c['ControlId'] | |
for c in controls if c['ControlStatus'] == 'DISABLED'] | |
logger.info("afsbp_current_enabled_ids={}".format( | |
afsbp_current_enabled_ids)) | |
logger.info("afsbp_current_disabled_ids={}".format( | |
afsbp_current_disabled_ids)) | |
# コントロールの無効化処理 | |
logger.info("disabling controls...") | |
disable_afsbp_controls( | |
securityhub, region, account_id, afsbp_future_disabled_ids, afsbp_current_enabled_ids) | |
# コントロールの有効化処理 | |
logger.info("enabling controls...") | |
enable_afsbp_controls( | |
securityhub, region, account_id, afsbp_future_enabled_ids, afsbp_current_disabled_ids) | |
# [End] | |
logger.info('[END] disable_securityhub_controls') | |
except Exception as e: | |
logger.error(e) | |
cfnresponse.send(event, context, cfnresponse.FAILED, | |
{'Response': 'Failure'}) | |
exit() | |
def lambda_handler(event, context): | |
logger.info('[START] lambda_handler') | |
if event['RequestType'] == 'Create': | |
update_securityhub_controls(event, context) | |
cfnresponse.send(event, context, cfnresponse.SUCCESS, | |
{'Response': 'Success'}) | |
if event['RequestType'] == 'Update': | |
update_securityhub_controls(event, context) | |
cfnresponse.send(event, context, cfnresponse.SUCCESS, | |
{'Response': 'Success'}) | |
if event['RequestType'] == 'Delete': | |
cfnresponse.send(event, context, cfnresponse.SUCCESS, | |
{'Response': 'Success'}) | |
logger.info('[END] lambda_handler') | |
LambdaExecutionRole: | |
Type: AWS::IAM::Role | |
Properties: | |
RoleName: !Sub "${RoleNamePrefix}-${AWS::Region}" | |
AssumeRolePolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: "lambda.amazonaws.com" | |
Action: "sts:AssumeRole" | |
ManagedPolicyArns: | |
- "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" | |
Policies: | |
- PolicyName: securityhub-actions | |
PolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Action: | |
- "sts:GetCallerIdentity" | |
- "securityhub:DescribeStandardsControls" | |
- "securityhub:UpdateStandardsControl" | |
Resource: "*" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment