Skip to content

Instantly share code, notes, and snippets.

@giuliocalzolari
Created February 1, 2019 09:01
Show Gist options
  • Save giuliocalzolari/5db49f832d82f2adfe62759172b3e823 to your computer and use it in GitHub Desktop.
Save giuliocalzolari/5db49f832d82f2adfe62759172b3e823 to your computer and use it in GitHub Desktop.
Lambda ACM CFM
AWSTemplateFormatVersion: '2010-09-09'
Description: Automatically validate ACM requests from this account
ACMApproverRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
-
PolicyName: "ACMCFNLambdaPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: "*"
-
Effect: "Allow"
Action:
- "acm:*"
- "route53:*"
Resource: "*"
ACMApproverFunction:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
import sys,os,json,datetime,boto3,traceback,time,hashlib
from botocore.vendored import requests
from datetime import date, datetime
def json_serial(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
def cfn_response(url, body):
headers = {'content-type' : '','content-length' : str(len(body))}
response = requests.put(url,data=body,headers=headers)
def acm_certificate(event, context):
print "Got %s" % json.dumps(event, default=json_serial)
if event['RequestType'] in ['Create','Update']:
cfn_response(event['ResponseURL'], _create_acm_certificate(event))
else:
cfn_response(event['ResponseURL'], _delete_acm_certificate(event))
def _create_acm_certificate(event):
acm = boto3.client('acm')
ret = {}
ret['StackId'] = event['StackId']
ret['RequestId'] = event['RequestId']
ret['LogicalResourceId'] = event['LogicalResourceId']
rp = event['ResourceProperties']
try:
cs = rp['DomainSuffix'].rstrip('.')
dn = "%s.%s" % (rp['DomainName'].rstrip('.'), cs)
san = dn
if len(dn) > 62:
hashlen = 62-len(cs)
ch = hashlib.sha256(dn).hexdigest()[-hashlen:]
dn = "%s.%s" % (ch, cs)
response = acm.list_certificates(
CertificateStatuses=['PENDING_VALIDATION','ISSUED']
)
cert_arn = None
for cert in response['CertificateSummaryList']:
print "existing cert: %s" % cert['DomainName']
if cert['DomainName'] == dn:
cert_arn = cert['CertificateArn']
if not cert_arn:
response = acm.request_certificate(
DomainName=dn,
ValidationMethod='DNS',
IdempotencyToken=event['LogicalResourceId'],
SubjectAlternativeNames = [san]
)
cert_arn = response['CertificateArn']
time.sleep(10)
response = acm.describe_certificate(
CertificateArn=cert_arn
)
r53_c = []
for vo in response['Certificate']['DomainValidationOptions']:
rr = vo['ResourceRecord']
r53_c.append({'Action':'UPSERT','ResourceRecordSet':{'Name': rr['Name'],'Type': rr['Type'],'TTL': 3600,'ResourceRecords': [{'Value': rr['Value']}]}})
r53 = boto3.client('route53')
response = r53.change_resource_record_sets(
HostedZoneId=rp['HostedZoneId'],
ChangeBatch={'Comment':'Auth','Changes':r53_c}
)
time.sleep(220)
ret['PhysicalResourceId'] = cert_arn
ret['Data'] = {}
ret['Data']['CertificateArn'] = cert_arn
ret['Status'] = 'SUCCESS'
except:
traceback.print_exc()
ret['Status'] = 'FAILED'
ret['Reason'] = 'Good question'
finally:
cfn_response(event['ResponseURL'], json.dumps(ret))
return json.dumps(ret)
def _delete_acm_certificate(event):
acm = boto3.client('acm')
ret = {}
ret['StackId'] = event['StackId']
ret['RequestId'] = event['RequestId']
ret['LogicalResourceId'] = event['LogicalResourceId']
ret['PhysicalResourceId'] = event['PhysicalResourceId']
try:
response = acm.delete_certificate(
CertificateArn=event['PhysicalResourceId']
)
print "got response %s" % json.dumps(response, default=json_serial)
ret['Status'] = 'SUCCESS'
except:
traceback.print_exc()
ret['Status'] = 'SUCCESS'
ret['Reason'] = 'Good question'
finally:
cfn_response(event['ResponseURL'], json.dumps(ret))
return json.dumps(ret)
Description: Cloudformation Custom Resource for ACM certs using Route53 approval
Handler: index.acm_certificate
MemorySize: 256
Runtime: python2.7
Timeout: 300
Role:
Fn::GetAtt: ACMApproverRole.Arn
ALBCertificate:
Type: 'Custom::ACMCertificate'
Version: '1.0'
Properties:
ServiceToken: !GetAtt ACMApproverFunction.Arn
DomainName: 'www'
DomainSuffix: 'example.com'
HostedZoneId: !Ref DNSZoneId
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment