Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Way Two: Enforce S3 Bucket Encryption (by Max Anderson)
AWSTemplateFormatVersion: '2010-09-09'
Description: Creates an event rule that detects when S3 buckets are created or encryption is turned off, and turns encryption on.
Resources:
BucketMonitoringRule:
Type: AWS::Events::Rule
Properties:
Description: 'Cloudwatch event required to implement this rule.'
Name: 'DetectS3CreateAndUpdate'
EventPattern:
source:
- aws.s3
detail-type:
- "AWS API Call via CloudTrail"
detail:
eventSource:
- s3.amazonaws.com
eventName:
- CreateBucket
- DeleteBucketEncryption
State: 'ENABLED'
Targets:
-
Arn: !GetAtt BucketMonitoringLambda.Arn
Id: 'BucketMonitoringTarget'
LambdaInvokePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt BucketMonitoringLambda.Arn
Action: 'lambda:InvokeFunction'
Principal: events.amazonaws.com
SourceArn: !GetAtt BucketMonitoringRule.Arn
LambdaLogGroup:
Type: AWS::Logs::LogGroup
DependsOn: BucketMonitoringLambda
Properties:
LogGroupName: "/aws/lambda/bucket-monitoring-lambda"
RetentionInDays: 14
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: "BucketMonitoringRuleRole"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- events.amazonaws.com
Action:
- sts:AssumeRole
Path: "/"
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AllowS3ApiCalls
Effect: Allow
Action:
- s3:GetEncryptionConfiguration
- s3:PutEncryptionConfiguration
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
BucketMonitoringLambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: 's3-encrypt-lambda'
Timeout: 40
Handler: index.lambda_handler
Runtime: python3.6
MemorySize: 128
Code:
ZipFile:
!Sub |
import boto3
import os
class S3Validator:
client = boto3.client('s3')
def __init__(self, event):
self.raw_event = event
self.bucket_name = event['detail']['requestParameters']['bucketName']
def validate_encryption_configuration(self):
if not self.bucket_encrypted():
self.encrypt_bucket()
def bucket_encrypted(self):
try:
self.client.get_bucket_encryption(
Bucket=self.bucket_name
)
return True
except Exception:
return False
def encrypt_bucket(self):
response = self.client.put_bucket_encryption(
Bucket=self.bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
},
]
}
)
def lambda_handler(event, context):
s3_validator = S3Validator(event)
s3_validator.validate_encryption_configuration()
Role: !GetAtt LambdaRole.Arn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.