Last active
December 4, 2024 01:19
-
-
Save atheiman/de1af05a3c94b45ac59bb9d9b6e6f407 to your computer and use it in GitHub Desktop.
AWS Config custom rule for resource tag compliance evaluation. Deployed as CloudFormation stacks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# See documented events sent by Config here: https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html | |
# | |
# It is much easier to write evaluations for rules using ConfigurationItemChangeNotification and | |
# OversizedConfigurationItemChangeNotification. These notifications include the resource as recorded | |
# by Config. The Lambda function can review the resource config json and submit an evaluation for | |
# the resource. | |
# | |
# ScheduledNotification events are not specific to a resource, the event only includes | |
# the account id and rule name. Lambda functions must list all the resources in the account using | |
# service apis, call the appropriate apis to evaluate the resources config, and then submit | |
# evaluations for each resource. | |
import boto3 | |
import os | |
import json | |
import re | |
if boto3.session.Session().region_name.startswith("us-gov-"): | |
partition = "aws-us-gov" | |
else: | |
partition = "aws" | |
sts = boto3.client("sts") | |
# Mapping of required tag keys and optionally tag value requirements | |
required_tags = { | |
"Environment": {"AllowedValues": ["prod", "nonprod"]}, | |
"Application": {}, | |
"Owner": {"Regex": "^.+@example.com$"}, | |
} | |
def handler(event, context): | |
# print(json.dumps(event, default=str)) | |
invoking_event = json.loads(event["invokingEvent"]) | |
# print(json.dumps(invoking_event, default=str)) | |
ci = invoking_event["configurationItem"] | |
# print(json.dumps(ci, default=str)) | |
print(ci["resourceType"], ci["ARN"]) | |
# Open a boto3 session in the resource account. Used to interact with Config api to submit | |
# resource evaluations. | |
resource_acct_session = get_session(ci["awsAccountId"]) | |
print("Resource acct session created:", resource_acct_session.client("sts").get_caller_identity()["Arn"]) | |
config = resource_acct_session.client("config") | |
# Verify resource tags are exposed in the CI. If this ever raises an error for a resource type, | |
# the tags might be exposed elsewhere in the invokingEvent, or a boto3 service client could be | |
# used to retrieve resource tags from the service. | |
if "tags" not in ci: | |
raise Exception( | |
f"Could not find 'tags' in invokingEvent.configurationItem - {ci['resourceType']} - {ci['ARN']}" | |
) | |
tags_dict = ci["tags"] | |
print(json.dumps(tags_dict)) | |
missing_required = [] | |
# Annotation is a message used to give more context to why a resource is noncompliant. Build a list | |
# of annotations for violations to be joined as a string later. | |
annotations = [] | |
evaluation = { | |
"ComplianceResourceType": ci["resourceType"], | |
"ComplianceResourceId": ci["resourceId"], | |
"ComplianceType": "COMPLIANT", | |
"OrderingTimestamp": invoking_event["notificationCreationTime"], | |
} | |
# Evaluate compliance against each required tag | |
for required_key, value_spec in required_tags.items(): | |
# Verify the required tag exists | |
if required_key not in tags_dict: | |
evaluation["ComplianceType"] = "NON_COMPLIANT" | |
missing_required.append(required_key) | |
# Continue to next required tag key, no need for further evaluation | |
continue | |
if "AllowedValues" in value_spec: | |
if tags_dict[required_key] not in value_spec["AllowedValues"]: | |
evaluation["ComplianceType"] = "NON_COMPLIANT" | |
annotations.append(f"Tag '{required_key}' value must be one of {value_spec['AllowedValues']}.") | |
if "Regex" in value_spec: | |
if not re.search(value_spec["Regex"], tags_dict[required_key]): | |
evaluation["ComplianceType"] = "NON_COMPLIANT" | |
annotations.append(f"Tag '{required_key}' value must match regular expression '{value_spec['Regex']}'.") | |
# Show any missing required tags first in the annotations | |
if missing_required: | |
annotations.insert(0, f"Missing required tag keys: {missing_required}.") | |
# Annotation must be submitted as a single string | |
annotation = " ".join(annotations) | |
# Annotation is limited to 256 characters by the Config api | |
if len(annotation) > 256: | |
annotation = annotation[:253] + "..." | |
if annotation: | |
evaluation["Annotation"] = annotation | |
print(json.dumps(evaluation)) | |
# Submit the evaluation and log the api response | |
print( | |
json.dumps( | |
config.put_evaluations(Evaluations=[evaluation], ResultToken=event["resultToken"]), | |
default=str, | |
) | |
) | |
def get_session(acct_id): | |
# create client in each account for assume role | |
creds = sts.assume_role( | |
RoleArn=f"arn:{partition}:iam::{acct_id}:role{os.environ['CROSS_ACCT_ROLE_WITH_PATH']}", | |
RoleSessionName=os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "cross-acct-config-rule"), | |
)["Credentials"] | |
# create a license manager boto3 session using credentials in other account | |
return boto3.Session( | |
aws_access_key_id=creds["AccessKeyId"], | |
aws_secret_access_key=creds["SecretAccessKey"], | |
aws_session_token=creds["SessionToken"], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This template is deployed to each region in each workload account (ideally as a stackset). | |
Parameters: | |
EvaluationFunctionArn: | |
Type: String | |
Resources: | |
ConfigRule: | |
Type: AWS::Config::ConfigRule | |
Properties: | |
ConfigRuleName: required-tags | |
Description: Evaluate tag policy compliance of AWS resources | |
Scope: | |
ComplianceResourceTypes: | |
# Resource types: https://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html | |
- 'AWS::EC2::Instance' | |
- 'AWS::RDS::DBCluster' | |
- 'AWS::RDS::DBInstance' | |
- 'AWS::S3::Bucket' | |
Source: | |
Owner: CUSTOM_LAMBDA | |
SourceIdentifier: !Sub "${EvaluationFunctionArn}" | |
SourceDetails: | |
# ConfigurationItemChangeNotification and OversizedConfigurationItemChangeNotification | |
# send full resource config to evaluation function for easier evaluation. | |
# ScheduledNotification only notifies the evaluation function that it is time to evaluate | |
# all resources. "Change" notifications are much easier to write evaluation functions for | |
# because of this. See documented events sent by Config here: | |
# https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html | |
- MessageType: ConfigurationItemChangeNotification | |
EventSource: aws.config | |
- MessageType: OversizedConfigurationItemChangeNotification | |
EventSource: aws.config | |
# - MessageType: ScheduledNotification | |
# MaximumExecutionFrequency: TwentyFour_Hours | |
# EventSource: aws.config | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This template is deployed to a central account to host the Config evaluation Lambda function. | |
# A cross account role needs to exist in each workload account with `config:PutEvaluations` permissions. | |
# Package and deploy with aws cli: | |
# | |
# aws cloudformation package \ | |
# --template-file ./mgmt-acct-cfn.yml \ | |
# --s3-bucket my-bucket \ | |
# --s3-prefix cloudformation/packages \ | |
# --output-template-file ./mgmt-acct-cfn.packaged.yml | |
# | |
# aws cloudformation deploy \ | |
# --template-file ./mgmt-acct-cfn.packaged.yml \ | |
# --stack-name resource-tags-custom-config-rule-mgmt \ | |
# --capabilities CAPABILITY_IAM | |
# | |
Parameters: | |
CrossAcctRoleWithPath: | |
Type: String | |
Default: '/OrganizationAccountAccessRole' | |
Resources: | |
EvaluationFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Role: !Sub "${EvaluationFunctionRole.Arn}" | |
Handler: config_evaluation.handler | |
Timeout: 30 | |
Runtime: python3.11 | |
Tags: | |
- Key: CfnStackId | |
Value: !Ref AWS::StackId | |
Environment: | |
Variables: | |
CROSS_ACCT_ROLE_WITH_PATH: !Ref CrossAcctRoleWithPath | |
Code: ./config_evaluation.py | |
EvaluationFunctionConfigPermission: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Sub "${EvaluationFunction.Arn}" | |
Action: lambda:InvokeFunction | |
Principal: config.amazonaws.com | |
# PrincipalOrgID does not work with `Principal: config.amazonaws.com`. `SourceAccount` can be | |
# used to list all the accounts in an org if desired. | |
EvaluationFunctionRole: | |
Type: AWS::IAM::Role | |
Properties: | |
AssumeRolePolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: lambda.amazonaws.com | |
Action: ["sts:AssumeRole"] | |
ManagedPolicyArns: | |
- !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" | |
Policies: | |
- PolicyName: Inline | |
PolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Action: sts:AssumeRole | |
Resource: !Sub "arn:${AWS::Partition}:iam::*:role${CrossAcctRoleWithPath}" | |
EvaluationFunctionLogGroup: | |
Type: AWS::Logs::LogGroup | |
Properties: | |
LogGroupName: !Sub '/aws/lambda/${EvaluationFunction}' | |
RetentionInDays: 14 | |
Tags: | |
- Key: CfnStackId | |
Value: !Ref AWS::StackId | |
Outputs: | |
EvaluationFunctionArn: | |
Value: !Sub '${EvaluationFunction.Arn}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment