Skip to content

Instantly share code, notes, and snippets.

@nh2
Forked from rmarchei/route53.py
Last active October 9, 2023 17:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save nh2/f744ac591e95f0c25b501db00cf7c71a to your computer and use it in GitHub Desktop.
Save nh2/f744ac591e95f0c25b501db00cf7c71a to your computer and use it in GitHub Desktop.
route53 hook for dehydrated - python2 / python3 + boto2 version. Tested on Ubuntu 16.04
#!/usr/bin/env python3
# How to use:
#
# Ubuntu 16.04: apt install -y python-boto OR apt install -y python3-boto
#
# Specify the default profile on aws/boto profile files or use the optional AWS_PROFILE env var:
# AWS_PROFILE=example ./dehydrated -c -d example.com -t dns-01 -k /etc/dehydrated/hooks/route53.py
#
# Manually specify hosted zone:
# HOSTED_ZONE=example.com AWS_PROFILE=example ./dehydrated -c -d example.com -t dns-01 -k /etc/dehydrated/hooks/route53.py
#
# More info about dehaydrated and dns challenge: https://github.com/lukas2511/dehydrated/wiki/Examples-for-DNS-01-hooks
# Using AWS Profiles: http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html#cli-multiple-profiles
#
# This hook also works with dehydrated's HOOK_CHAIN="yes", which passes all domains in one invocation.
# It is recommended to use this hook with HOOK_CHAIN="yes" because it is faster to challenge domains in batch.
#
# This hook also works with wildcard certificates.
import os
import sys
from boto.route53 import *
from time import sleep
USAGE_TEXT = "USAGE: route53.py CHALLENGE_TYPE DOMAIN TOKEN_FILENAME_IGNORED TOKEN_VALUE [DOMAIN TOKEN_FILENAME_IGNORED TOKEN_VALUE]..."
def get_zone_id(conn, domain):
if 'HOSTED_ZONE' in os.environ:
hosted_zone = os.environ['HOSTED_ZONE']
if not domain.endswith(hosted_zone):
raise Exception("Incorrect hosted zone for domain {0}".format(domain))
zone = conn.get_hosted_zone_by_name("{0}.".format(hosted_zone))
zone_id = zone['GetHostedZoneResponse']['HostedZone']['Id'].replace('/hostedzone/', '')
else:
zones = conn.get_all_hosted_zones()
candidate_zones = []
domain_dot = "{0}.".format(domain)
for zone in zones['ListHostedZonesResponse']['HostedZones']:
if domain_dot.endswith(zone['Name']):
candidate_zones.append((domain_dot.find(zone['Name']), zone['Id'].replace('/hostedzone/', '')))
if len(candidate_zones) == 0:
raise Exception("Hosted zone not found for domain {0}".format(domain))
candidate_zones.sort()
zone_id = candidate_zones[0][1]
return zone_id
def wait_for_dns_update(conn, response, time_elapsed=0):
timeout = 300
sleep_time = 5
st = status.Status(conn, response['ChangeResourceRecordSetsResponse']['ChangeInfo'])
while st.update() != 'INSYNC' and time_elapsed <= timeout:
print("Waiting for DNS change to complete... ({0}; elapsed {1} seconds)".format(st, time_elapsed))
sleep(sleep_time)
time_elapsed += sleep_time
if st.update() != 'INSYNC' and time_elapsed > timeout:
raise Exception("Timed out while waiting for DNS record to be ready. Waited {0} seconds but the last status was {1}".format(time_elapsed, st))
print("DNS change completed")
return time_elapsed
def route53_dns(domain_challenges_dict, action):
action = action.upper()
assert action in ['UPSERT', 'DELETE']
conn = connection.Route53Connection()
responses = []
for domain, txt_challenges in domain_challenges_dict.items():
print("domain: {0}".format(domain))
print("txt_challenges: {0}".format(txt_challenges))
zone_id = get_zone_id(conn, domain)
name = u'_acme-challenge.{0}.'.format(domain) # note u'' and trailing . are important here for the == below
# Get existing record set, so we can add our challenges to it.
# It's important that we add instead of override, to support dehydrated's HOOK_CHAIN="no",
# (in which case we as the hook can't see all changes to make upfront).
record_set = conn.get_all_rrsets(zone_id, name=name)
record_exists = False
existing_quoted_txt_challenges = [] # include "" quotes already; not a set because 'DELETE' may care about order
for record in record_set:
if record.name == name and record.type == "TXT":
record_exists = True
existing_quoted_txt_challenges += record.resource_records
if action == 'UPSERT':
needed_quoted_txt_challenges = set('"{0}"'.format(c) for c in txt_challenges)
all_quoted_txt_challenges = set(existing_quoted_txt_challenges) | needed_quoted_txt_challenges
change = record_set.add_change('UPSERT', name, type='TXT', ttl=60)
for txt_challenge in all_quoted_txt_challenges:
change.add_value(txt_challenge)
response = record_set.commit()
responses.append(response)
elif action == 'DELETE':
if record_exists:
change = record_set.add_change('DELETE', name, type='TXT', ttl=60)
for txt_challenge in existing_quoted_txt_challenges:
change.add_value(txt_challenge)
response = record_set.commit()
# We don't block the hook to wait for deletion to complete.
# responses.append(response)
else:
print("Challenge record " + name + " is already gone!")
if responses != []:
print("Waiting for all responses...")
time_elapsed = 0
for response in responses:
time_elapsed = wait_for_dns_update(conn, response, time_elapsed)
def deploy_hook_args_to_domain_challenge_dict(hook_args):
assert len(hook_args) % 3 == 0, "wrong number of arguments, hook arguments must be multiple of 3; " + USAGE_TEXT
domain_dict = {}
for i in range(0, len(hook_args), 3):
domain = hook_args[i]
txt_challenge = hook_args[i+2]
domain_dict.setdefault(domain, []).append(txt_challenge)
return domain_dict
if __name__ == "__main__":
assert len(sys.argv) >= 2, "wrong number of arguments, need at least 1; " + USAGE_TEXT
hook = sys.argv[1]
print("hook: {0}".format(hook))
if hook == "deploy_challenge":
hook_args = sys.argv[2:]
domain_challenges_dict = deploy_hook_args_to_domain_challenge_dict(hook_args)
route53_dns(domain_challenges_dict, action='upsert')
elif hook == "clean_challenge":
hook_args = sys.argv[2:]
domain_challenges_dict = deploy_hook_args_to_domain_challenge_dict(hook_args)
route53_dns(domain_challenges_dict, action='delete')
elif hook == "startup_hook":
print("Ignoring startup_hook")
exit(0)
elif hook == "exit_hook":
print("Ignoring exit_hook")
exit(0)
elif hook == "deploy_cert":
print("Ignoring deploy_cert hook")
exit(0)
elif hook == "unchanged_cert":
print("Ignoring unchanged_cert hook")
exit(0)
else:
print("Ignoring unknown hook %s", hook)
exit(0)
@niall-byrne
Copy link

Recommend changing xrange to range, allowing python3 compatibility.

@patrakov
Copy link

patrakov commented Sep 7, 2018

I have forked this, to support the case when _acme-challenge is a CNAME. This is good for security reasons - you can put the CNAME target into a separate Route53 zone, and restrict the write permissions for the AWS account to that zone only. In other words, limit the damage if the credentials are stolen - the thieves will not be able to zero out your main zone.

The fork is at https://gist.github.com/patrakov/fbf0a09c027c0d32712c8703ab614868

@tomchiverton
Copy link

Fork here removes some of the output to keep it cleaner : https://gist.github.com/tomchiverton/53ea2b2d584690959e83cd33a7e5475b

@nh2
Copy link
Author

nh2 commented Oct 9, 2023

Recommend changing xrange to range, allowing python3 compatibility.

Fixed now.

We should really group up and put this thing into a standalone repo. Using multiple Gists makes maintenance and merging features difficult.

@patrakov
Copy link

patrakov commented Oct 9, 2023

I don't use route53 anymore and thus cannot maintain my version. However, I would be happy to answer any questions about the setup that led to its creation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment