Skip to content

Instantly share code, notes, and snippets.

@fmoor
Last active April 1, 2022 16:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fmoor/fe9a024d8e309c296d72c94c87c123dc to your computer and use it in GitHub Desktop.
Save fmoor/fe9a024d8e309c296d72c94c87c123dc to your computer and use it in GitHub Desktop.
Issue Let's Encrypt certificate using the TLS-ALPN-01 challenge
"""Example issuing TLS certificate using TLS-ALPN-01 challenge.
see also: https://github.com/certbot/certbot/blob/master/acme/examples/http01_example.py
"""
import binascii
import codecs
import contextlib
import os
import threading
import josepy
from acme import client as acme_client
from acme import challenges
from acme import errors
from acme import messages
from acme import standalone
from cryptography.hazmat.primitives.asymmetric import rsa
from OpenSSL import crypto
DOMAIN = 'example.com'
EMAIL = 'user@example.com'
USER_AGENT = 'example'
ACCOUNT_KEY_BITS = 4096
CERT_PKEY_BITS = 4096
# The TLSALPN01 challenge MUST use TCP port 443.
# https://datatracker.ietf.org/doc/html/rfc8737#section-3
PORT = 443
# Let's Encrypt staging URL.
# https://letsencrypt.org/docs/staging-environment/
DIRECTORY_URL = 'https://acme-staging-v02.api.letsencrypt.org/directory'
@contextlib.contextmanager
def challenge_server(domain_key: crypto.PKey, cert: crypto.X509):
address = ('', PORT)
certs = {DOMAIN.encode('utf-8'): (domain_key, cert)}
try:
server = standalone.TLSALPN01Server(
address, certs=list(certs.values()), challenge_certs=certs)
thread = threading.Thread(target=server.serve_forever)
thread.start()
yield server
finally:
try:
server.shutdown()
server.server_close()
thread.join()
except NameError:
pass
def new_csr(domain_key: crypto.PKey, domain: str) -> crypto.X509Req:
csr = crypto.X509Req()
csr.add_extensions([
crypto.X509Extension(
b'subjectAltName',
critical=False,
value=f'DNS:{domain}'.encode('ascii')
),
])
csr.set_pubkey(domain_key)
csr.set_version(2)
csr.sign(domain_key, 'sha256')
return csr
class ChallengeUnavailable(Exception):
pass
def select_tlsalpn01(order: messages.OrderResource) -> messages.ChallengeBody:
for auth in order.authorizations:
for challenge in auth.body.challenges:
if isinstance(challenge.chall, challenges.TLSALPN01):
return challenge
raise ChallengeUnavailable('TLS-ALPN-01 challenge was not offered by the CA server.')
def gen_self_signed_cert(
response: challenges.TLSALPN01Response,
domain_key: crypto.PKey
) -> crypto.X509:
"""Generate a self signed certificate for TLSALPN01 validation.
Reference: https://datatracker.ietf.org/doc/html/rfc8737#section-3
This functionality is currently broken in acme==1.25.0
"""
cert = crypto.X509()
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
cert.set_version(2)
cert.get_subject().CN = DOMAIN
cert.set_issuer(cert.get_subject())
cert.add_extensions([
crypto.X509Extension(
b'subjectAltName',
critical=False,
value=b'DNS:' + DOMAIN.encode('utf-8'),
),
crypto.X509Extension(
b'1.3.6.1.5.5.7.1.31',
critical=True,
value=b'DER:04:20:' + codecs.encode(response.h, 'hex'),
)
])
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(7 * 24 * 60 * 60)
cert.set_pubkey(domain_key)
cert.sign(domain_key, "sha256")
return cert
def perform_tlsalpn01(
client: acme_client.ClientV2,
challenge_body: messages.ChallengeBody,
order: messages.OrderResource,
domain_key: crypto.PKey
) -> str:
response = challenge_body.response(client.net.key)
csr = gen_self_signed_cert(response, domain_key)
with challenge_server(domain_key, csr):
client.answer_challenge(challenge_body, response)
finalized_order = client.poll_and_finalize(order)
return finalized_order.fullchain_pem
def issue_certificate(
client: acme_client.ClientV2,
domain_key: crypto.PKey
) -> str:
"""Issue a certificate using the TLSALPN01 challenge."""
csr = new_csr(domain_key, DOMAIN)
csr_pem = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
order = client.new_order(csr_pem)
challenge_body = select_tlsalpn01(order)
try:
return perform_tlsalpn01(client, challenge_body, order, domain_key)
except errors.ValidationError as e:
print(e.failed_authzrs)
raise
def main():
account_key = josepy.JWKRSA(
key=rsa.generate_private_key(public_exponent=65537, key_size=ACCOUNT_KEY_BITS))
# Register account and accept TOS
network = acme_client.ClientNetwork(account_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(network.get(DIRECTORY_URL).json())
client = acme_client.ClientV2(directory, network)
account = client.new_account(messages.NewRegistration.from_data(
email=EMAIL,
terms_of_service_agreed=True,
))
# Create domain private key
domain_key = crypto.PKey()
domain_key.generate_key(crypto.TYPE_RSA, CERT_PKEY_BITS)
fullchain_pem = issue_certificate(client, domain_key)
print(fullchain_pem)
if __name__ == '__main__':
main()
acme==1.25.0
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==36.0.2
idna==3.3
josepy==1.13.0
pycparser==2.21
pyOpenSSL==22.0.0
pyRFC3339==1.1
pytz==2022.1
requests==2.27.1
requests-toolbelt==0.9.1
urllib3==1.26.9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment