Last active January 18, 2024 00:12
import boto3
import certbot.main
import datetime
import os
import raven
import subprocess
def read_and_delete_file(path):
with open(path, 'r') as file:
contents =
return contents
def provision_cert(email, domains):
'certonly', # Obtain a cert but don't install it
'-n', # Run in non-interactive mode
'--agree-tos', # Agree to the terms of service,
'--email', email, # Email
'--dns-route53', # Use dns challenge with route53
'-d', domains, # Domains to provision certs for
# Override directory paths so script doesn't have to be run as root
'--config-dir', '/tmp/config-dir/',
'--work-dir', '/tmp/work-dir/',
'--logs-dir', '/tmp/logs-dir/',
first_domain = domains.split(',')[0]
path = '/tmp/config-dir/live/' + first_domain + '/'
return {
'certificate': read_and_delete_file(path + 'cert.pem'),
'private_key': read_and_delete_file(path + 'privkey.pem'),
'certificate_chain': read_and_delete_file(path + 'chain.pem')
def should_provision(domains):
existing_cert = find_existing_cert(domains)
if existing_cert:
now =
not_after = existing_cert['Certificate']['NotAfter']
return (not_after - now).days <= 30
return True
def find_existing_cert(domains):
domains = frozenset(domains.split(','))
client = boto3.client('acm')
paginator = client.get_paginator('list_certificates')
iterator = paginator.paginate(PaginationConfig={'MaxItems':1000})
for page in iterator:
for cert in page['CertificateSummaryList']:
cert = client.describe_certificate(CertificateArn=cert['CertificateArn'])
sans = frozenset(cert['Certificate']['SubjectAlternativeNames'])
if sans.issubset(domains):
return cert
return None
def notify_via_sns(topic_arn, domains, certificate):
process = subprocess.Popen(['openssl', 'x509', '-noout', '-text'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf8')
stdout, stderr = process.communicate(certificate)
client = boto3.client('sns')
Subject='Issued new LetsEncrypt certificate',
Message='Issued new certificates for domains: ' + domains + '\n\n' + stdout,
def upload_cert_to_acm(cert, domains):
existing_cert = find_existing_cert(domains)
certificate_arn = existing_cert['Certificate']['CertificateArn'] if existing_cert else None
client = boto3.client('acm')
acm_response = client.import_certificate(
return None if certificate_arn else acm_response['CertificateArn']
def handler(event, context):
domains = os.environ['LETSENCRYPT_DOMAINS']
if should_provision(domains):
cert = provision_cert(os.environ['LETSENCRYPT_EMAIL'], domains)
upload_cert_to_acm(cert, domains)
notify_via_sns(os.environ['NOTIFICATION_SNS_ARN'], domains, cert['certificate'])
client = raven.Client(os.environ['SENTRY_DSN'], transport=raven.transport.http.HTTPTransport)
gb1035 commented Jan 18, 2024

How did you deploy the lambda? Was it from a zip file, a container image, or manually editing via the web console?

