-
-
Save mmalone/12f5422b2ec68e64e9d11eae0c6ca47d to your computer and use it in GitHub Desktop.
Example of obtaining a certificate using ACME and serving HTTPS in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Example HTTPS server that obtains a certificate using ACME `http-01` | |
challenge and auto-renews ahead of expiry. | |
The workflow consists of: | |
(Account creation) | |
- Create account key | |
- Register account and accept TOS | |
(Certificate actions) | |
- Select HTTP-01 within offered challenges by the CA server | |
- Set up http challenge resource | |
- Set up standalone web server | |
- Create domain private key and CSR | |
- Issue certificate | |
- Start HTTPS server | |
- Renew certificate periodically and update server | |
Things that could be improved: | |
- Don't write cert/key to disk (ssl module makes this hard) | |
- Better timer loop that cancels on SIGKILL | |
- Locking or some form of synchronization between writing the CERT_FILE and reading it | |
- Caching or something so cert isn't loaded with every connection | |
""" | |
import BaseHTTPServer | |
import datetime | |
import OpenSSL | |
import os | |
import pem | |
import signal | |
import ssl | |
import threading | |
from contextlib import contextmanager | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from acme import challenges | |
from acme import client | |
from acme import crypto_util | |
from acme import errors | |
from acme import messages | |
from acme import standalone | |
import josepy as jose | |
# This is the ACME Directory URL for `step-ca` | |
DIRECTORY_URL = 'https://acme.internal/acme/acme/directory' | |
# This is the root certificate for `step-ca` | |
ROOT_CERTIFICATE = "/home/mmalone/.step/certs/root_ca.crt" | |
# File to read/write cert | |
CERT_FILE = "cert.pem" | |
# File to read/write key | |
KEY_FILE = "key.pem" | |
# Email address used for creating ACME acount | |
ACC_EMAIL = "you@yours.com" | |
# Account key size | |
ACC_KEY_BITS = 2048 | |
# Certificate private key size | |
CERT_PKEY_BITS = 2048 | |
# Domain name for the certificate. | |
DOMAIN = 'bar.internal' | |
# Frequency (in seconds) with which we'll check whether cert should be renewed | |
RENEW_FREQUENCY = 15 | |
# Port to listen on for `http-01` challenge | |
ACME_PORT = 80 | |
# Port our HTTPS server will listen on | |
HTTPS_PORT = 10443 | |
def new_csr(domain_name, pkey_pem=None): | |
"""Create certificate signing request.""" | |
if pkey_pem is None: | |
pkey = OpenSSL.crypto.PKey() | |
pkey.generate_key(OpenSSL.crypto.TYPE_RSA, CERT_PKEY_BITS) | |
pkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, pkey) | |
csr_pem = crypto_util.make_csr(pkey_pem, [domain_name]) | |
return pkey_pem, csr_pem | |
def select_http01_chall(orderr): | |
"""Look for and select `http-01` from the challenges offered by the server.""" | |
for authz in orderr.authorizations: | |
for i in authz.body.challenges: | |
if isinstance(i.chall, challenges.HTTP01): | |
return i | |
raise Exception('HTTP-01 challenge was not offered by the CA server.') | |
@contextmanager | |
def challenge_server(http_01_resources): | |
"""Manage standalone server set up and shutdown.""" | |
# Setting up a server that binds at PORT and any address. | |
address = ('', ACME_PORT) | |
try: | |
servers = standalone.HTTP01DualNetworkedServers(address, http_01_resources) | |
servers.serve_forever() | |
yield servers | |
finally: | |
servers.shutdown_and_server_close() | |
def perform_http01(client_acme, challb, orderr): | |
"""Set up standalone webserver and perform HTTP-01 challenge.""" | |
response, validation = challb.response_and_validation(client_acme.net.key) | |
resource = standalone.HTTP01RequestHandler.HTTP01Resource( | |
chall=challb.chall, response=response, validation=validation) | |
with challenge_server({resource}): | |
# Let the CA server know that we are ready for the challenge. | |
client_acme.answer_challenge(challb, response) | |
# Wait for challenge status and then issue a certificate. | |
# It is possible to set a deadline time. | |
finalized_orderr = client_acme.poll_and_finalize(orderr) | |
return finalized_orderr.fullchain_pem | |
class H(BaseHTTPServer.BaseHTTPRequestHandler): | |
def do_GET(self): | |
self.send_response(200) | |
self.send_header('content-type', 'text/html; charset=utf-8') | |
self.end_headers() | |
crt = self.connection.getpeercert() | |
if crt is None: | |
self.wfile.write("Hello, TLS!\n") | |
else: | |
san = crt.get('subjectAltName')[0][1] | |
self.wfile.write("Hello, "+san+"!\n") | |
def serve_https(): | |
# Create account key | |
acc_key = jose.JWKRSA( | |
key=rsa.generate_private_key(public_exponent=65537, | |
key_size=ACC_KEY_BITS, | |
backend=default_backend())) | |
# Create client configured to use our ACME server and trust our root cert | |
net = client.ClientNetwork(acc_key, user_agent='python-acme-example', verify_ssl=ROOT_CERTIFICATE) | |
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json()) | |
client_acme = client.ClientV2(directory, net=net) | |
# Register account and accept TOS | |
regr = client_acme.new_account( | |
messages.NewRegistration.from_data( | |
email=ACC_EMAIL, terms_of_service_agreed=True)) | |
# Create domain private key and CSR | |
pkey_pem, csr_pem = new_csr(DOMAIN) | |
# Issue certificate | |
orderr = client_acme.new_order(csr_pem) | |
# Select HTTP-01 within offered challenges by the CA server | |
challb = select_http01_chall(orderr) | |
# The certificate is ready to be used in the variable "fullchain_pem". | |
fullchain_pem = perform_http01(client_acme, challb, orderr) | |
print fullchain_pem | |
with os.fdopen(os.open(KEY_FILE, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as keyfile: | |
keyfile.write(pkey_pem) | |
with open(CERT_FILE, 'w') as certfile: | |
certfile.write(fullchain_pem) | |
# Create HTTPD and setup TLS context. | |
httpd = BaseHTTPServer.HTTPServer(('', HTTPS_PORT), H) | |
# Create a shiny new SSLContext... which should really be called a | |
# TLSContext, but whatever. | |
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2) | |
# Make client authentication (client certificates) optional. If a client | |
# sends a certificate it will be verified. | |
context.verify_mode = ssl.CERT_OPTIONAL | |
# Use our CA's root certificate to verify client certificates | |
context.load_verify_locations(cafile=ROOT_CERTIFICATE) | |
# Use our certificate & private key | |
context.load_cert_chain(CERT_FILE, KEY_FILE) | |
# Set a servername callback to facilitate renewals. This is is called after | |
# the TLS Client Hello handshake, to update the SSLSocket.context and use | |
# the currently valid certificate. In later versions of Python this is | |
# called sni_callback. | |
def sni_callback(sock, name, ctx): | |
# Load the latest cert & private key from disk | |
ctx.load_cert_chain(CERT_FILE, KEY_FILE) | |
#context.sni_callback = sni_callback | |
context.set_servername_callback(sni_callback) | |
# Renew certificate periodically | |
def check_renew(): | |
global t | |
# Leaf cert is the first in the file | |
cert_pem = str(pem.parse_file(CERT_FILE)[0]) | |
cert = x509.load_pem_x509_certificate(cert_pem, default_backend()) | |
# Renew if 2/3 of the way through lifetime | |
renewAfter = (cert.not_valid_after - cert.not_valid_before).total_seconds() * 2.0/3.0 | |
renewAt = cert.not_valid_before + datetime.timedelta(seconds=renewAfter) | |
if datetime.datetime.now() >= renewAt: | |
print "Renewing certificate now: %s" % (datetime.datetime.now(),) | |
_, csr_pem = new_csr(DOMAIN, pkey_pem) | |
orderr = client_acme.new_order(csr_pem) | |
challb = select_http01_chall(orderr) | |
fullchain_pem = perform_http01(client_acme, challb, orderr) | |
with open(CERT_FILE, 'w') as certfile: | |
certfile.write(fullchain_pem) | |
cert_pem = str(pem.parse_file(CERT_FILE)[0]) | |
cert = x509.load_pem_x509_certificate(cert_pem, default_backend()) | |
print "Certificate renewed: %s - %s" % (cert.not_valid_before, cert.not_valid_after) | |
else: | |
print "Waiting to renew at: %s" % (renewAt,) | |
threading.Timer(RENEW_FREQUENCY, check_renew).start() | |
threading.Timer(RENEW_FREQUENCY, check_renew).start() | |
# Set our TLS socket and serve some HTTPS! | |
httpd.socket = context.wrap_socket(httpd.socket, server_side=True) | |
httpd.serve_forever() | |
if __name__ == "__main__": | |
serve_https() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment