Skip to content

Instantly share code, notes, and snippets.

@atheiman
Last active December 4, 2024 01:19
Show Gist options
  • Save atheiman/de1af05a3c94b45ac59bb9d9b6e6f407 to your computer and use it in GitHub Desktop.
Save atheiman/de1af05a3c94b45ac59bb9d9b6e6f407 to your computer and use it in GitHub Desktop.
AWS Config custom rule for resource tag compliance evaluation. Deployed as CloudFormation stacks.
# See documented events sent by Config here: https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html
#
# It is much easier to write evaluations for rules using ConfigurationItemChangeNotification and
# OversizedConfigurationItemChangeNotification. These notifications include the resource as recorded
# by Config. The Lambda function can review the resource config json and submit an evaluation for
# the resource.
#
# ScheduledNotification events are not specific to a resource, the event only includes
# the account id and rule name. Lambda functions must list all the resources in the account using
# service apis, call the appropriate apis to evaluate the resources config, and then submit
# evaluations for each resource.
import boto3
import os
import json
import re
if boto3.session.Session().region_name.startswith("us-gov-"):
partition = "aws-us-gov"
else:
partition = "aws"
sts = boto3.client("sts")
# Mapping of required tag keys and optionally tag value requirements
required_tags = {
"Environment": {"AllowedValues": ["prod", "nonprod"]},
"Application": {},
"Owner": {"Regex": "^.+@example.com$"},
}
def handler(event, context):
# print(json.dumps(event, default=str))
invoking_event = json.loads(event["invokingEvent"])
# print(json.dumps(invoking_event, default=str))
ci = invoking_event["configurationItem"]
# print(json.dumps(ci, default=str))
print(ci["resourceType"], ci["ARN"])
# Open a boto3 session in the resource account. Used to interact with Config api to submit
# resource evaluations.
resource_acct_session = get_session(ci["awsAccountId"])
print("Resource acct session created:", resource_acct_session.client("sts").get_caller_identity()["Arn"])
config = resource_acct_session.client("config")
# Verify resource tags are exposed in the CI. If this ever raises an error for a resource type,
# the tags might be exposed elsewhere in the invokingEvent, or a boto3 service client could be
# used to retrieve resource tags from the service.
if "tags" not in ci:
raise Exception(
f"Could not find 'tags' in invokingEvent.configurationItem - {ci['resourceType']} - {ci['ARN']}"
)
tags_dict = ci["tags"]
print(json.dumps(tags_dict))
missing_required = []
# Annotation is a message used to give more context to why a resource is noncompliant. Build a list
# of annotations for violations to be joined as a string later.
annotations = []
evaluation = {
"ComplianceResourceType": ci["resourceType"],
"ComplianceResourceId": ci["resourceId"],
"ComplianceType": "COMPLIANT",
"OrderingTimestamp": invoking_event["notificationCreationTime"],
}
# Evaluate compliance against each required tag
for required_key, value_spec in required_tags.items():
# Verify the required tag exists
if required_key not in tags_dict:
evaluation["ComplianceType"] = "NON_COMPLIANT"
missing_required.append(required_key)
# Continue to next required tag key, no need for further evaluation
continue
if "AllowedValues" in value_spec:
if tags_dict[required_key] not in value_spec["AllowedValues"]:
evaluation["ComplianceType"] = "NON_COMPLIANT"
annotations.append(f"Tag '{required_key}' value must be one of {value_spec['AllowedValues']}.")
if "Regex" in value_spec:
if not re.search(value_spec["Regex"], tags_dict[required_key]):
evaluation["ComplianceType"] = "NON_COMPLIANT"
annotations.append(f"Tag '{required_key}' value must match regular expression '{value_spec['Regex']}'.")
# Show any missing required tags first in the annotations
if missing_required:
annotations.insert(0, f"Missing required tag keys: {missing_required}.")
# Annotation must be submitted as a single string
annotation = " ".join(annotations)
# Annotation is limited to 256 characters by the Config api
if len(annotation) > 256:
annotation = annotation[:253] + "..."
if annotation:
evaluation["Annotation"] = annotation
print(json.dumps(evaluation))
# Submit the evaluation and log the api response
print(
json.dumps(
config.put_evaluations(Evaluations=[evaluation], ResultToken=event["resultToken"]),
default=str,
)
)
def get_session(acct_id):
# create client in each account for assume role
creds = sts.assume_role(
RoleArn=f"arn:{partition}:iam::{acct_id}:role{os.environ['CROSS_ACCT_ROLE_WITH_PATH']}",
RoleSessionName=os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "cross-acct-config-rule"),
)["Credentials"]
# create a license manager boto3 session using credentials in other account
return boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
)
# This template is deployed to each region in each workload account (ideally as a stackset).
Parameters:
EvaluationFunctionArn:
Type: String
Resources:
ConfigRule:
Type: AWS::Config::ConfigRule
Properties:
ConfigRuleName: required-tags
Description: Evaluate tag policy compliance of AWS resources
Scope:
ComplianceResourceTypes:
# Resource types: https://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html
- 'AWS::EC2::Instance'
- 'AWS::RDS::DBCluster'
- 'AWS::RDS::DBInstance'
- 'AWS::S3::Bucket'
Source:
Owner: CUSTOM_LAMBDA
SourceIdentifier: !Sub "${EvaluationFunctionArn}"
SourceDetails:
# ConfigurationItemChangeNotification and OversizedConfigurationItemChangeNotification
# send full resource config to evaluation function for easier evaluation.
# ScheduledNotification only notifies the evaluation function that it is time to evaluate
# all resources. "Change" notifications are much easier to write evaluation functions for
# because of this. See documented events sent by Config here:
# https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html
- MessageType: ConfigurationItemChangeNotification
EventSource: aws.config
- MessageType: OversizedConfigurationItemChangeNotification
EventSource: aws.config
# - MessageType: ScheduledNotification
# MaximumExecutionFrequency: TwentyFour_Hours
# EventSource: aws.config
# This template is deployed to a central account to host the Config evaluation Lambda function.
# A cross account role needs to exist in each workload account with `config:PutEvaluations` permissions.
# Package and deploy with aws cli:
#
# aws cloudformation package \
# --template-file ./mgmt-acct-cfn.yml \
# --s3-bucket my-bucket \
# --s3-prefix cloudformation/packages \
# --output-template-file ./mgmt-acct-cfn.packaged.yml
#
# aws cloudformation deploy \
# --template-file ./mgmt-acct-cfn.packaged.yml \
# --stack-name resource-tags-custom-config-rule-mgmt \
# --capabilities CAPABILITY_IAM
#
Parameters:
CrossAcctRoleWithPath:
Type: String
Default: '/OrganizationAccountAccessRole'
Resources:
EvaluationFunction:
Type: AWS::Lambda::Function
Properties:
Role: !Sub "${EvaluationFunctionRole.Arn}"
Handler: config_evaluation.handler
Timeout: 30
Runtime: python3.11
Tags:
- Key: CfnStackId
Value: !Ref AWS::StackId
Environment:
Variables:
CROSS_ACCT_ROLE_WITH_PATH: !Ref CrossAcctRoleWithPath
Code: ./config_evaluation.py
EvaluationFunctionConfigPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Sub "${EvaluationFunction.Arn}"
Action: lambda:InvokeFunction
Principal: config.amazonaws.com
# PrincipalOrgID does not work with `Principal: config.amazonaws.com`. `SourceAccount` can be
# used to list all the accounts in an org if desired.
EvaluationFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: ["sts:AssumeRole"]
ManagedPolicyArns:
- !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
Policies:
- PolicyName: Inline
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Resource: !Sub "arn:${AWS::Partition}:iam::*:role${CrossAcctRoleWithPath}"
EvaluationFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${EvaluationFunction}'
RetentionInDays: 14
Tags:
- Key: CfnStackId
Value: !Ref AWS::StackId
Outputs:
EvaluationFunctionArn:
Value: !Sub '${EvaluationFunction.Arn}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment