Skip to content

Instantly share code, notes, and snippets.


mmalone/ Secret

Last active Jul 12, 2020
What would you like to do?
Example of obtaining a certificate using ACME and serving HTTPS in Python
"""Example HTTPS server that obtains a certificate using ACME `http-01`
challenge and auto-renews ahead of expiry.
The workflow consists of:
(Account creation)
- Create account key
- Register account and accept TOS
(Certificate actions)
- Select HTTP-01 within offered challenges by the CA server
- Set up http challenge resource
- Set up standalone web server
- Create domain private key and CSR
- Issue certificate
- Start HTTPS server
- Renew certificate periodically and update server
Things that could be improved:
- Don't write cert/key to disk (ssl module makes this hard)
- Better timer loop that cancels on SIGKILL
- Locking or some form of synchronization between writing the CERT_FILE and reading it
- Caching or something so cert isn't loaded with every connection
import BaseHTTPServer
import datetime
import OpenSSL
import os
import pem
import signal
import ssl
import threading
from contextlib import contextmanager
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from acme import challenges
from acme import client
from acme import crypto_util
from acme import errors
from acme import messages
from acme import standalone
import josepy as jose
# This is the ACME Directory URL for `step-ca`
DIRECTORY_URL = 'https://acme.internal/acme/acme/directory'
# This is the root certificate for `step-ca`
ROOT_CERTIFICATE = "/home/mmalone/.step/certs/root_ca.crt"
# File to read/write cert
CERT_FILE = "cert.pem"
# File to read/write key
KEY_FILE = "key.pem"
# Email address used for creating ACME acount
# Account key size
# Certificate private key size
# Domain name for the certificate.
DOMAIN = 'bar.internal'
# Frequency (in seconds) with which we'll check whether cert should be renewed
# Port to listen on for `http-01` challenge
# Port our HTTPS server will listen on
HTTPS_PORT = 10443
def new_csr(domain_name, pkey_pem=None):
"""Create certificate signing request."""
if pkey_pem is None:
pkey = OpenSSL.crypto.PKey()
pkey.generate_key(OpenSSL.crypto.TYPE_RSA, CERT_PKEY_BITS)
pkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, pkey)
csr_pem = crypto_util.make_csr(pkey_pem, [domain_name])
return pkey_pem, csr_pem
def select_http01_chall(orderr):
"""Look for and select `http-01` from the challenges offered by the server."""
for authz in orderr.authorizations:
for i in authz.body.challenges:
if isinstance(i.chall, challenges.HTTP01):
return i
raise Exception('HTTP-01 challenge was not offered by the CA server.')
def challenge_server(http_01_resources):
"""Manage standalone server set up and shutdown."""
# Setting up a server that binds at PORT and any address.
address = ('', ACME_PORT)
servers = standalone.HTTP01DualNetworkedServers(address, http_01_resources)
yield servers
def perform_http01(client_acme, challb, orderr):
"""Set up standalone webserver and perform HTTP-01 challenge."""
response, validation = challb.response_and_validation(
resource = standalone.HTTP01RequestHandler.HTTP01Resource(
chall=challb.chall, response=response, validation=validation)
with challenge_server({resource}):
# Let the CA server know that we are ready for the challenge.
client_acme.answer_challenge(challb, response)
# Wait for challenge status and then issue a certificate.
# It is possible to set a deadline time.
finalized_orderr = client_acme.poll_and_finalize(orderr)
return finalized_orderr.fullchain_pem
class H(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_header('content-type', 'text/html; charset=utf-8')
crt = self.connection.getpeercert()
if crt is None:
self.wfile.write("Hello, TLS!\n")
san = crt.get('subjectAltName')[0][1]
self.wfile.write("Hello, "+san+"!\n")
def serve_https():
# Create account key
acc_key = jose.JWKRSA(
# Create client configured to use our ACME server and trust our root cert
net = client.ClientNetwork(acc_key, user_agent='python-acme-example', verify_ssl=ROOT_CERTIFICATE)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
client_acme = client.ClientV2(directory, net=net)
# Register account and accept TOS
regr = client_acme.new_account(
email=ACC_EMAIL, terms_of_service_agreed=True))
# Create domain private key and CSR
pkey_pem, csr_pem = new_csr(DOMAIN)
# Issue certificate
orderr = client_acme.new_order(csr_pem)
# Select HTTP-01 within offered challenges by the CA server
challb = select_http01_chall(orderr)
# The certificate is ready to be used in the variable "fullchain_pem".
fullchain_pem = perform_http01(client_acme, challb, orderr)
print fullchain_pem
with os.fdopen(, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as keyfile:
with open(CERT_FILE, 'w') as certfile:
# Create HTTPD and setup TLS context.
httpd = BaseHTTPServer.HTTPServer(('', HTTPS_PORT), H)
# Create a shiny new SSLContext... which should really be called a
# TLSContext, but whatever.
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2)
# Make client authentication (client certificates) optional. If a client
# sends a certificate it will be verified.
context.verify_mode = ssl.CERT_OPTIONAL
# Use our CA's root certificate to verify client certificates
# Use our certificate & private key
context.load_cert_chain(CERT_FILE, KEY_FILE)
# Set a servername callback to facilitate renewals. This is is called after
# the TLS Client Hello handshake, to update the SSLSocket.context and use
# the currently valid certificate. In later versions of Python this is
# called sni_callback.
def sni_callback(sock, name, ctx):
# Load the latest cert & private key from disk
ctx.load_cert_chain(CERT_FILE, KEY_FILE)
#context.sni_callback = sni_callback
# Renew certificate periodically
def check_renew():
global t
# Leaf cert is the first in the file
cert_pem = str(pem.parse_file(CERT_FILE)[0])
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
# Renew if 2/3 of the way through lifetime
renewAfter = (cert.not_valid_after - cert.not_valid_before).total_seconds() * 2.0/3.0
renewAt = cert.not_valid_before + datetime.timedelta(seconds=renewAfter)
if >= renewAt:
print "Renewing certificate now: %s" % (,)
_, csr_pem = new_csr(DOMAIN, pkey_pem)
orderr = client_acme.new_order(csr_pem)
challb = select_http01_chall(orderr)
fullchain_pem = perform_http01(client_acme, challb, orderr)
with open(CERT_FILE, 'w') as certfile:
cert_pem = str(pem.parse_file(CERT_FILE)[0])
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
print "Certificate renewed: %s - %s" % (cert.not_valid_before, cert.not_valid_after)
print "Waiting to renew at: %s" % (renewAt,)
threading.Timer(RENEW_FREQUENCY, check_renew).start()
threading.Timer(RENEW_FREQUENCY, check_renew).start()
# Set our TLS socket and serve some HTTPS!
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.