Instantly share code, notes, and snippets.

@arkadiyt /main.py
Last active Jan 15, 2019

Embed
What would you like to do?
import boto3
import certbot.main
import datetime
import os
import raven
import subprocess
def read_and_delete_file(path):
with open(path, 'r') as file:
contents = file.read()
os.remove(path)
return contents
def provision_cert(email, domains):
certbot.main.main([
'certonly', # Obtain a cert but don't install it
'-n', # Run in non-interactive mode
'--agree-tos', # Agree to the terms of service,
'--email', email, # Email
'--dns-route53', # Use dns challenge with route53
'-d', domains, # Domains to provision certs for
# Override directory paths so script doesn't have to be run as root
'--config-dir', '/tmp/config-dir/',
'--work-dir', '/tmp/work-dir/',
'--logs-dir', '/tmp/logs-dir/',
])
first_domain = domains.split(',')[0]
path = '/tmp/config-dir/live/' + first_domain + '/'
return {
'certificate': read_and_delete_file(path + 'cert.pem'),
'private_key': read_and_delete_file(path + 'privkey.pem'),
'certificate_chain': read_and_delete_file(path + 'chain.pem')
}
def should_provision(domains):
existing_cert = find_existing_cert(domains)
if existing_cert:
now = datetime.datetime.now(datetime.timezone.utc)
not_after = existing_cert['Certificate']['NotAfter']
return (not_after - now).days <= 30
else:
return True
def find_existing_cert(domains):
domains = frozenset(domains.split(','))
client = boto3.client('acm')
paginator = client.get_paginator('list_certificates')
iterator = paginator.paginate(PaginationConfig={'MaxItems':1000})
for page in iterator:
for cert in page['CertificateSummaryList']:
cert = client.describe_certificate(CertificateArn=cert['CertificateArn'])
sans = frozenset(cert['Certificate']['SubjectAlternativeNames'])
if sans.issubset(domains):
return cert
return None
def notify_via_sns(topic_arn, domains, certificate):
process = subprocess.Popen(['openssl', 'x509', '-noout', '-text'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf8')
stdout, stderr = process.communicate(certificate)
client = boto3.client('sns')
client.publish(TopicArn=topic_arn,
Subject='Issued new LetsEncrypt certificate',
Message='Issued new certificates for domains: ' + domains + '\n\n' + stdout,
)
def upload_cert_to_acm(cert, domains):
existing_cert = find_existing_cert(domains)
certificate_arn = existing_cert['Certificate']['CertificateArn'] if existing_cert else None
client = boto3.client('acm')
acm_response = client.import_certificate(
CertificateArn=certificate_arn,
Certificate=cert['certificate'],
PrivateKey=cert['private_key'],
CertificateChain=cert['certificate_chain']
)
return None if certificate_arn else acm_response['CertificateArn']
def handler(event, context):
try:
domains = os.environ['LETSENCRYPT_DOMAINS']
if should_provision(domains):
cert = provision_cert(os.environ['LETSENCRYPT_EMAIL'], domains)
upload_cert_to_acm(cert, domains)
notify_via_sns(os.environ['NOTIFICATION_SNS_ARN'], domains, cert['certificate'])
except:
client = raven.Client(os.environ['SENTRY_DSN'], transport=raven.transport.http.HTTPTransport)
client.captureException()
raise
@gb1035

This comment has been minimized.

Copy link

gb1035 commented Dec 11, 2018

Thank you for this. I believe the latest version of boto3 requires the omission of the "CerticiateArn" keyword for a new cert:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/acm.html#ACM.Client.import_certificate
I made the change here: https://gist.github.com/gb1035/3910200ee9fa580d34e69d7d8d043d44

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment