Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Created July 18, 2021 11:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adiroiban/b895a62b4d28197dfdc8b803aa23ab0c to your computer and use it in GitHub Desktop.
Save adiroiban/b895a62b4d28197dfdc8b803aa23ab0c to your computer and use it in GitHub Desktop.
chevah txacme
# Copyright (c) 2018 Adi Roiban.
# See LICENSE for details.
"""
Act as ACME client to generate and renew SSL certificates from Let's Encrypt.
The certificates are then set to each SSL service and the service
is restarted if required.
The task of updating the service configuration is done by the
certificate storage.
"""
from zope.interface import implementer, Interface
import pem
from OpenSSL import SSL
from twisted.internet import defer
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.python.url import URL
from twisted.web import http
from twisted.web.iweb import IResponse
# Let's Encrypt is not available everywhere
try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from josepy.jwa import RS256
from josepy.jwk import JWKRSA
from txacme.client import Client, JWSClient
from acme.errors import ClientError
from txacme.interfaces import ICertificateStore, IResponder
from txacme.service import AcmeIssuingService
except ImportError:
# Placeholders for when we don't have the lets encrypt libraries.
JWSClient = None
ICertificateStore = Interface
IResponder = Interface
AcmeIssuingService = object
from chevah.server import force_unicode
from chevah.server.commons import DEFAULT_BACKEND
from chevah.server.commons.exception import ServerException
from chevah.server.commons.interface import (
ILetsEncryptClientConfiguration,
ILetsEncryptClientResource,
ISSLOptions,
)
from chevah.server.commons.runnable import Runnable
from chevah.server.commons.service import listen
from chevah.server.configuration.model import (
WritableBoolean,
WritableString,
WritableStringOrNone,
)
from chevah.server.configuration.option import (
AddressPortOptionsMixin,
)
from chevah.server.http.client import PersistentAgent
from chevah.server.http.resource import (
RedirectResource,
StaticTextResource,
)
from chevah.server.http.factory import HTTPFactoryBase
from chevah.server.http.configuration import HTTPServiceConfigurationSection
from chevah.server.resource.configuration import _ResourceConfigurationBase
# Self-signed certificate returned for the domains for which we don't yet
# have a certificate.
# openssl req -x509 -newkey rsa:2048 -nodes \
# -keyout key.pem -out cert.pem -days 7300
# It has CN = "loading-lets-encrypt"
_PEMS_PLACEHOLDER = u"""
-----BEGIN CERTIFICATE-----
MIIDLjCCAhagAwIBAgIJAPcHtbsKQK/BMA0GCSqGSIb3DQEBCwUAMCwxCzAJBgNV
BAYTAkdCMR0wGwYDVQQDDBRsb2FkaW5nLWxldHMtZW5jcnlwdDAeFw0xODExMTgy
MjIyNTFaFw0zODExMTMyMjIyNTFaMCwxCzAJBgNVBAYTAkdCMR0wGwYDVQQDDBRs
b2FkaW5nLWxldHMtZW5jcnlwdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBAMBTFjLB7moRizureD2u8l42ZjG3/y/iDjQdV7EQeEWKWF/8QusaTmBBXRvS
e8Rz7p0aRfdiZS80Kp2h8Wbdk9BY/R1OmpHsaofCNL00s5cF345wFIPiIjFubI2X
9SZ6WmhiMA8CvxDu9mCJBoJf7jRJn5gBQVIE2jSSap8lVvyVsJp7G8+MV2PL9Ax1
GinDXN6r+zdnjG2sVx1pFDShuGNCnD6l+RQToUm771t5kAuE6usx5FICcxu+Y18h
AFza4g15cdSN4ql4hOc2M1jeqlhwYPGSyZVi3OyDpiQGlVQE2N3PwFue9d7pHu/A
gS3ROWygoxjgyRC4kiobsgHqMdsCAwEAAaNTMFEwHQYDVR0OBBYEFBVQLqOzkoBb
MF9xs9rEMKfGhqYrMB8GA1UdIwQYMBaAFBVQLqOzkoBbMF9xs9rEMKfGhqYrMA8G
A1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAKn3xdAH+U6RkfMQXn0F
6GF3oDcL9y0Pgr6twwgzR+XI3FzdJrLz2FgNbxyAB2pJsi+luDJ/b4Qu6ba+Dwft
LUvPtcR+4OpcINANOnsRbmqQnlURxbEZgm6A8rgQHlec6yMTNV2O+I8ccvhd3ERK
mpn5UlFMuR3L5UWzt7kYu5nXvchNdk2C41kj+coKy5rUyVe70162qXei+hB3bglf
6kT8BMQYJty8sFLIcsHa27Y7icXGNTZTtRNojh5/gkJNzn5nwMwySAfn+SE+qRsB
L8NLVJ3C/2kq31I1yRDl3j7smctr+1B3sLqLzhW9wYifCYYYGOaRsE676akX4/LH
Zlo=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAUxYywe5qEYs7
q3g9rvJeNmYxt/8v4g40HVexEHhFilhf/ELrGk5gQV0b0nvEc+6dGkX3YmUvNCqd
ofFm3ZPQWP0dTpqR7GqHwjS9NLOXBd+OcBSD4iIxbmyNl/UmelpoYjAPAr8Q7vZg
iQaCX+40SZ+YAUFSBNo0kmqfJVb8lbCaexvPjFdjy/QMdRopw1zeq/s3Z4xtrFcd
aRQ0obhjQpw+pfkUE6FJu+9beZALhOrrMeRSAnMbvmNfIQBc2uINeXHUjeKpeITn
NjNY3qpYcGDxksmVYtzsg6YkBpVUBNjdz8BbnvXe6R7vwIEt0TlsoKMY4MkQuJIq
G7IB6jHbAgMBAAECggEAYw6oYU5HmPQeTYZ8sPCZvvKv4i3NzwDUpfzy/Kzp9Y9T
A2Uvpl3vPl6MMVdRDhdUMDCnFsrkKNLqnDQb3QqFJoufGugwbrZhDDl35nftg/yv
b3LE7kSbrfAdEzTQPJhKVnc0H5cfh/48ge4xnxb/3oiJHcaPuUnshaVgK0YVLpSZ
Di2+5u6jSK9vDba54/IEc2IsnfEA+Uu6JEBSZYWKPK2vI7jVutU3j52N9QQV3Ua2
YSOdOv4nehF3Uqv0YZUoydfkl5OQBMK3TtrRBL9qQCH7vcIpTRcvhXqmBRTuvyAd
xKXsDzcrvK2NMe+DCMJ650cEyIzdUWylARs51nX9YQKBgQD2ABPmMVAxlpmYbXfC
V/WY3m7VnUmvjzumKx2+6OByzgwnCv2Gigr+C+YuQ9TMgl2SPQvoz/ltMf+LU7iL
mth2hs1z4i6bUsVF9QxRe8mkPVPTaASNGXENoQG/2uK2AIi5FE1czdALE1w16d1U
otMpw+808FqhtMxygfsZzubW6wKBgQDIJHMjeM2zB4z1hfbBaEejdGYxGy1w6kQZ
5wdGx/1uL698U+Xly/RuRKUDxlHDxpSJaLN/jKhYct+S4mvxwRhjY2p8MfDMEJw8
Pd/VHEq/1d4F85D9I+Hg6k8NXlCwGYdL2jJVcF9ui+zwyv/Xi9plWHRNZA3TdkUJ
M8VZ4ic00QKBgAdKHUzW7T9q7QY9SC00Ggz7mmEuFf7jyaq04F7F29DLFkUZ6EVS
Vd0NUTbRv76Hpmos8OtnLkb0ElR4mKFaZ0ur1u62JxdnLn1SM5k+h80cHB3JmUjt
FhKHUNv0in9GKMcDOP+lAaMyYasfUPWvFX7JVY9GCAl+qAhEncI4BVMdAoGBALEb
YDABlLVadyhFdchMiShhtpS54gpLpBvvzwaZrvA0jmvMqmIhi5mQcR3X/z7pmUPH
PSAfzlEGxkVqy+7Q2s4IiZCBeP57rGW6szoYoSUFEkXd9W2stROdBHtl/Kz4yjsb
SPNGT4y5keC7Io8RGSAZmEFryrhXrluoTnltoRghAoGAPtJkcqgx710XD6hxgFEy
G9JR+g9l+NGcK7SSvqRBfQMFZC4KjfziSJVxI2rfZkuDSN4qccprtQQsYIbKzQs3
FJLYQdefBfHAMRuYokvg7qgb5anzkAM61Q/MIq9lnlRM4Kik4NDivMdS1vh6ClHt
kt/kWXaJ2YYyqk9b5YhMhzU=
-----END PRIVATE KEY-----
"""
@implementer(ILetsEncryptClientConfiguration)
class LetsEncryptClientConfiguration(
_ResourceConfigurationBase,
AddressPortOptionsMixin,
HTTPServiceConfigurationSection,
):
"""
Configurations for the Let's Encrypt resource.
It is a resource configuration and has all the basic HTTP server
configurations.
"""
PORT_DEFAULT = 80
# HTTPServiceConfigurationSection is messing with proxy.
_proxy = None
# The resources are started on demand, but for Let's Encrypt we want
# to be a service and a resource.
enabled = WritableBoolean('enabled')
debug = WritableBoolean('debug')
acme_url = WritableString('acme_url')
contact_email = WritableStringOrNone('contact_email')
redirect_url = WritableString('redirect_url')
_account_key = WritableString('_account_key')
_storage = WritableString('_storage')
@property
def use_ssl(self):
"""
See `IServiceConfiguration`.
"""
return False
# FIXME:2599:
# This should be removed once service configuration is flat.
class _ServiceConfgurationProxy(object):
"""
This tried to simulate the service configuration in which the
configuration if found at 2 levels: base and base.configuration.
For Let's Encrypt we only have `base`.
"""
_OWN_ATTRIBUTES = ['parent']
def __init__(self, context):
self._me = context
def __getattr__(self, attr):
if attr in self._OWN_ATTRIBUTES:
target = self
else:
target = self._me
return getattr(target, attr)
@property
def parent(self):
return self._me
@implementer(ILetsEncryptClientResource)
class LetsEncryptClientResource(Runnable):
"""
Manage certificate issued by Let's Encrypt.
It uses the configured account key to register/update an account with that
key.
It is an HTTP server for HTTP-01 challenge and ACME client.
The certificate storage is available even when the resource is stopped.
"""
PROPERTIES_REQUIRING_RESTART = (
HTTPServiceConfigurationSection.PROPERTIES_REQUIRING_RESTART + (
'acme_url',
'address',
'port',
'redirect_url',
)
)
# Size of the account key when auto-generated.
_KEY_SIZE = 2048
_JWSClient = JWSClient
def __init__(self, parent, configuration):
super(LetsEncryptClientResource, self).__init__(parent, configuration)
self._resetState()
def _resetState(self):
"""
Reset the internal state.
"""
self._http_server = None
self._agent = None
# The high level ACME service taking care of updating the certs.
self._acme_service = None
# The acme client used to make high level requests.
self._client = None
# The HTTP Server.
self._factory = LetsEncryptHTTPFactory(
_ServiceConfgurationProxy(self.configuration),
on_error=self.fail,
)
# The storage for the domain certificates.
self._store = CertificateStore(
self.configuration,
on_update=self._onUpdate,
domains_getter=self._getConfiguredDomains,
)
def _onStart(self, avatar=None):
"""
See: Runnable.
"""
self._resetState()
if self._JWSClient is None:
raise ServerException(
'Let\'s Encrypt is not yet supported on this OS.')
if not self.configuration.acme_url:
raise ServerException(
'Missing URL to the Let\'s Encrypt Server Directory.')
endpoint = TCP4ServerEndpoint(
reactor=self._scheduler,
interface=self.configuration.address,
port=self.configuration.port,
)
deferred = listen(endpoint, self._factory, self.configuration.port)
deferred.addErrback(self._ebHTTPServerStarted)
deferred.addCallback(self._cbHTTPServerStarted)
return deferred
def _ebHTTPServerStarted(self, failure):
"""
Called when failing to start the HTTP server.
"""
raise ServerException(
u'Failed to start the local HTTP-01 challenge server. %s' % (
force_unicode(failure.value),))
def _cbHTTPServerStarted(self, port):
"""
Called when HTTP server was started.
"""
self._http_server = port
return self._startACMEClient()
def _startACMEClient(self):
"""
Start the ACME client service.
"""
acme_key = self._getAccountKey()
self._agent = PersistentAgent(
event_emitter=self.emitEvent,
ssl_context=SSL.Context(SSL.SSLv23_METHOD),
)
jws_client = JWSClient(
agent=self._agent, key=acme_key, alg=RS256)
def get_message(failure):
"""
Return the best human readable message we can get out of failure.
"""
error = failure.value
if failure.check(ClientError):
try:
error = error.args[0]
except IndexError:
pass
if IResponse.providedBy(failure.value.args[0]):
response = failure.value.args[0]
error = 'Server responded %s:%s' % (
response.code, response.phrase)
return force_unicode(error)
def cb_client_creation(client):
"""
Called when the client has validated the ACME server.
"""
self._client = client
self._acme_service = AcmeIssuingService(
cert_store=self._store,
client=self._client,
email=self.configuration.contact_email,
clock=self._scheduler,
responders=[self._factory.responder],
panic=self._onPanic,
)
deferred = self._acme_service.start()
deferred.addErrback(eb_service_start)
return deferred
def eb_client_creation(failure):
"""
Called when failing to create the client.
"""
raise ServerException('Failed to discover the ACME server. %s' % (
get_message(failure),))
def eb_service_start(failure):
"""
Called when the service fails to start..
"""
raise ServerException('Failed to register the ACME server. %s' % (
get_message(failure),))
# To simplify the upgrade, we try to automatically migrate from
# public V1 to V2.
acme_url = self.configuration.acme_url
if acme_url.lower() == 'https://acme-v1.api.letsencrypt.org/directory':
acme_url = u'https://acme-v02.api.letsencrypt.org/directory'
self.configuration.acme_url = acme_url
deferred = Client.from_url(
self._scheduler,
URL.fromText(acme_url),
key=acme_key,
alg=RS256,
jws_client=jws_client,
)
deferred.addErrback(eb_client_creation)
deferred.addCallback(cb_client_creation)
return deferred
@defer.inlineCallbacks
def _onStop(self, avatar=None):
"""
See: Runnable.
"""
if self._agent:
yield self._agent.closePersistentConnections()
self._agent = None
if self._acme_service:
yield self._acme_service.stopService()
self._acme_service = None
if self._http_server:
yield self._http_server.stopListening()
def getCertificatePEMs(self, name, sans=None):
"""
Return a PEM serialized of all the certificate objects for `name`.
This include cert itself, key and cert chain.
This will always return a certificate.
If no certificate is yet found, it will return a self-signed one.
"""
if not sans:
sans = []
# Is important to join using a comma and space, as this is the
# normalized name of the certificate.
certificate_name = u', '.join([name] + sans)
def cb_store_get(pem_objects):
"""
Called when we got valid object in the cert storage.
"""
if not pem_objects:
# No certificate yet...but should get one soon.
return _PEMS_PLACEHOLDER
return u'\n'.join(o.as_bytes() for o in pem_objects)
def eb_store_get(failure):
"""
Called when `name` is not in the certificate storage.
"""
failure.trap(ServerException)
if self._acme_service:
# Service is started.
# Trigger a new cert create. Don't wait for it.
# Make sure errors are handled.
deferred = self._acme_service.issue_cert(certificate_name)
deferred.addErrback(
lambda failure: self._onPanic(failure, certificate_name))
# Here to help with testing.
self._ongoing_new_cert = deferred
return _PEMS_PLACEHOLDER
deferred = self._store.get(certificate_name)
deferred.addCallback(cb_store_get)
deferred.addErrback(eb_store_get)
# This is a hack as `getCertificatePEMs` is a sync call.
# But by the time we are here, we should already have a result: the
# real one or the placeholder.
return deferred.result
def _getDomainsConfiguration(self):
"""
Return a dict with mapping of the configured domains to the targeted
services.
"""
result = {}
for service in self.root.services.getAll():
configuration = service.configuration.configuration
if not ISSLOptions.providedBy(configuration):
continue
if not configuration.ssl_domains:
continue
# Is important to join using a comma and space, as this is the
# normalized name of the certificate.
domains = ', '.join(configuration.ssl_domains)
services = result.get(domains, [])
services.append(service)
result[domains] = services
return result
def _getConfiguredDomains(self):
"""
Return the list of the domains configured for the services and
which need to be managed by the resource.
"""
return self._getDomainsConfiguration().keys()
def _onUpdate(self, certificate_name, pem_objects):
"""
Called when we got updated certificates/keys for a certificate.
"""
for name, services in self._getDomainsConfiguration().items():
if name != certificate_name:
continue
for service in services:
update = ''.join(o.as_bytes() for o in pem_objects)
# The service runnable has a service configuration which
# has the configuration section for the protocol.
# We don't save the configuration as it is saved
# in another call.
service.configuration.configuration.ssl_certificate = update
service.configuration.configuration.ssl_key = ''
self.emitEvent(
'20016',
data={'domains': certificate_name, 'service': service.name}
)
if not service.restart_required:
# We either got the same cert,
# but most probably the service is stopped.
continue
service.restart()
def _getAccountKey(self):
"""
Return the JWK key used by the acme client account.
This can take some time to run and will block the server when
generating a new key, but we expect that the key is not generated
often so for now, don't bother with threading.
"""
content = self.configuration._account_key
if content:
try:
key = serialization.load_pem_private_key(
content.encode('ascii'),
password=None,
backend=DEFAULT_BACKEND,
)
except Exception as error:
raise ServerException(
'Failed to load the account key. %s' % (error,))
else:
try:
key = rsa.generate_private_key(
public_exponent=65537,
key_size=self._KEY_SIZE,
backend=DEFAULT_BACKEND,
)
account_key = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
self._configuration._account_key = account_key
self._configuration._proxy.save()
except Exception as error:
raise ServerException(
'Failed to generate and save the account key. %s' % (
error,))
return JWKRSA(key=key)
def _onPanic(self, failure, certificate_name):
"""
Called when failing to get a certificate for `domain`.
"""
if failure.check(defer.FirstError):
failure = failure.value.subFailure
self.emitEvent(
'20017',
data={
'domains': certificate_name,
'details': force_unicode(failure.value),
},
)
class LetsEncryptHTTPFactory(HTTPFactoryBase):
"""
A simple HTTP site/factory which does Let's Encrypt over HTTP-01.
"""
def __init__(self, service_configuration, on_error):
self._responder = HTTP01ChevahResponder()
super(LetsEncryptHTTPFactory, self).__init__(
service_configuration, on_error)
@property
def responder(self):
return self._responder
def getRootResource(self):
"""
Root is empty and only has .well-known/acme-challenge/ .
"""
redirect_url = self.service_configuration.redirect_url
if redirect_url:
root = RedirectResource(
url=redirect_url, code=http.MOVED_PERMANENTLY)
else:
root = StaticTextResource('')
well_known = StaticTextResource('')
root.putChild('.well-known', well_known)
well_known.putChild('acme-challenge', self._responder)
return root
@implementer(IResponder)
class HTTP01ChevahResponder(StaticTextResource):
"""
Web resource for ``http-01`` challenge responder.
Beside the challenge pages, it displays empty pages.
"""
challenge_type = u'http-01'
def __init__(self):
super(HTTP01ChevahResponder, self).__init__('')
# Add a static response to help with connection troubleshooting.
self.putChild(b'test.txt', StaticTextResource('Let\'s Encrypt Ready'))
def start_responding(self, server_name, challenge, response):
"""
Prepare for the ACME server to validate the challenge.
"""
self.putChild(
challenge.encode('token').encode('utf-8'),
StaticTextResource(response.key_authorization),
)
def stop_responding(self, server_name, challenge, response):
"""
Remove the child resource once the process is done.
"""
encoded_token = challenge.encode('token').encode('utf-8')
if self.getStaticEntity(encoded_token) is not None:
self.delEntity(encoded_token)
@implementer(ICertificateStore)
class CertificateStore(object):
"""
A certificate store that keeps certificates in the configuration as a
single value.
For certificates with multiple domains, the key is a comma separated list
of domains (with a space after the comma)
Return a `ServerException` failure when a requested domain has no
certificate.
"""
def __init__(self, configuration, on_update, domains_getter):
self._storage = {}
self._configuration = configuration
self._onUpdate = on_update
self._getConfiguredDomains = domains_getter
self._load()
def _getNormalizedName(self, name):
"""
Return the normalized comma separated certificate name.
"""
domains = []
for part in name.split(','):
part = part.strip()
if not part:
continue
domains.append(part)
return ', '.join(domains)
def as_dict(self):
"""
Return all the managed certificates as a dict,
indexed by the certificate name.
It will not return certificates which were managed in the past
for which no longer have a service using them.
This is done so that when a new check is triggered for the certs,
old certs are not scheduled for renew.
"""
self._refresh()
return defer.succeed(self._storage.copy())
def get(self, certificate_name):
"""
Get the X.509 objects for `certificate_name`.
"""
certificate_name = self._getNormalizedName(certificate_name)
try:
return defer.succeed(self._storage[certificate_name])
except KeyError:
return defer.fail(ServerException(
'No certificate for "%s".' % (certificate_name,)))
def store(self, certificate_name, pem_objects):
"""
Keep and persist the X.509 objects for `certificate_name`.
"""
certificate_name = self._getNormalizedName(certificate_name)
self._storage[certificate_name] = pem_objects
self._save()
self._onUpdate(certificate_name, pem_objects)
return defer.succeed(None)
def _refresh(self):
"""
Update the domains which needs to be managed.
"""
names = self._getConfiguredDomains()
# Delete domains which are no longer handled.
existing_domains = list(self._storage.keys())
for domain in existing_domains:
if domain not in names:
del self._storage[domain]
for domain in names:
if domain not in existing_domains:
self._storage[domain] = []
def _load(self):
"""
Populate the storage with the persisted PEM objects.
"""
lines = []
cert_names = None
lines = self._configuration._storage.splitlines()
if not lines:
# Cache is completely empty.
return
storage_acme_url = lines[0].strip()
if storage_acme_url != self._configuration.acme_url:
# We have a new acme_url so all certificates should be reloaded.
# So we don't load anything as storage values are no longer valid.
return
for line in lines[1:]:
if '|||' in line:
# We got a certificate delimiter.
if cert_names:
# Load the current certificate.
self._storage[cert_names] = pem.parse('\n'.join(lines))
# Start a new certificate.
# The certificate name is guarded by '|||'.
lines = []
cert_names = self._getNormalizedName(
line.strip()[3:-3].strip())
continue
lines.append(line)
if cert_names:
# Load the last certificate.
self._storage[cert_names] = pem.parse('\n'.join(lines))
def _save(self):
"""
Persist the current domains to the configuration.
"""
serialization = []
for key, value in self._storage.items():
cert_names = '|||%s|||\n' % (key,)
serialization.append(
cert_names + ''.join(o.as_bytes() for o in value) + '\n\n')
acme_url = self._configuration.acme_url + '\n'
self._configuration._storage = acme_url + '\n'.join(serialization)
self._configuration._proxy.save()
# Copyright (c) 2018 Adi Roiban.
# See LICENSE for details.
"""
Tests for Let's Encrypt resource.
"""
from __future__ import unicode_literals
import pem
from twisted.internet import reactor
from twisted.web import http
from twisted.web.client import Agent
try:
from acme import challenges
except ImportError:
challenges = None
from chevah.server import json
from chevah.server.commons.constant import (
CONFIGURATION_DEFAULTS,
TYPE_NAME,
)
from chevah.server.commons.exception import (
NoSuchAttributeError,
)
from chevah.server.commons.interface import ILetsEncryptClientConfiguration
from chevah.server.resource.lets_encrypt import (
CertificateStore,
HTTP01ChevahResponder,
LetsEncryptClientConfiguration,
LetsEncryptClientResource,
LetsEncryptHTTPFactory,
_PEMS_PLACEHOLDER,
)
from chevah.server.testing import (
attr,
HTTPServerContext,
ResponseDefinition,
WithProcessTestCase,
mk,
)
from chevah.server.testing.constant import SSL_DATA
from chevah.server.testing.proto_helpers import InMemoryTCPReactor
def setup_module():
"""
Let's encrypt is not available on all systems.
"""
if challenges is None:
raise WithProcessTestCase.skipTest('No lets-encrypt support.')
class TestLetsEncryptClientConfiguration(WithProcessTestCase):
"""
Tests for LetsEncryptClientConfiguration.
"""
def test_init(self):
"""
It is initialized with a .ini backend.
"""
uuid = unicode(mk.uuid4())
content = (
'[resources/%s]\n'
'enabled: no\n'
'name: some-name\n'
'type: lets-encrypt\n'
'description: bla bla\n'
'address: 1.2.3.5\n'
'port: 2532\n'
'acme_url: \thttp://acme.com/directory \n'
'redirect_url: \thttps://ftp.domain.com/some-path/ \n'
'debug: yes\n'
'_account_key: some-key\n'
'_storage: some-storage\n'
) % (uuid,)
proxy = mk.makeFileConfigurationProxy(
content, defaults=CONFIGURATION_DEFAULTS)
parent = self.Bunch(_proxy=proxy, parent=None)
sut = LetsEncryptClientConfiguration(parent, uuid)
self.assertFalse(sut.enabled)
self.assertEqual('some-name', sut.name)
self.assertEqual('bla bla', sut.description)
self.assertEqual('1.2.3.5', sut.address)
self.assertEqual(2532, sut.port)
self.assertProvides(ILetsEncryptClientConfiguration, sut)
self.assertEqual('http://acme.com/directory', sut.acme_url)
self.assertEqual('https://ftp.domain.com/some-path/', sut.redirect_url)
self.assertIsTrue(sut.debug)
# The private values are also loaded.
self.assertEqual('some-key', sut._account_key)
self.assertEqual('some-storage', sut._storage)
self.assertFalse(sut.use_ssl)
def test_register_default(self):
"""
It is registered and can be created a a property with default values.
"""
sut = self.process.configuration.resources.createProperty(
'', {'type': TYPE_NAME.LETS_ENCRYPT})
self.assertTrue(sut.enabled)
self.assertProvides(ILetsEncryptClientConfiguration, sut)
self.assertEqual(
'https://acme-v02.api.letsencrypt.org/directory', sut.acme_url)
self.assertEqual('', sut._account_key)
self.assertEqual('', sut._storage)
self.assertEqual('', sut.name)
self.assertEqual('', sut.description)
self.assertEqual('127.0.0.1', sut.address)
self.assertEqual(80, sut.port)
self.assertEqual('', sut.redirect_url)
self.assertIsFalse(sut.debug)
def test_private_fields(self):
"""
The private members are not available to public API
"""
resources = self.process.configuration.resources
with self.assertRaises(NoSuchAttributeError):
resources.createProperty(
'',
{'type': TYPE_NAME.LETS_ENCRYPT, '_account_key': 'something'})
with self.assertRaises(NoSuchAttributeError):
resources.createProperty(
'',
{'type': TYPE_NAME.LETS_ENCRYPT, '_storage': 'something'})
sut = resources.createProperty(
'', {'type': TYPE_NAME.LETS_ENCRYPT})
sut._account_key = 'new-account-key'
sut._stroage = 'new-storage'
result = sut.getProperty()
self.assertNotContains('_account_key', result)
self.assertNotContains('_storage', result)
class TestHTTP01ChevahResponder(WithProcessTestCase):
"""
Unit tests for HTTP01ChevahResponder.
"""
def test_init(self):
"""
Is initialized like a resource with empty body and has a test child.
"""
request = mk.makeTwistedWebRequest()
sut = HTTP01ChevahResponder()
test_child = sut.getChildWithDefault(b'test.txt', request)
result = test_child.render(request)
self.assertEqual(b'Let\'s Encrypt Ready', result)
self.assertEqual(http.OK, request.code)
other_child = sut.getChildWithDefault(b'other-child', request)
code = other_child.headersReceived(request)
self.assertEqual(http.NOT_FOUND, code)
def test_start_responding_stop_responding(self):
"""
It will make the challenge available over HTTP GET and will make it
unavailable once stopped.
"""
token = 'bla-\N{sun}'
token_base64 = b'YmxhLeKYiQ'
request = mk.makeTwistedWebRequest(method=b'GET')
sut = HTTP01ChevahResponder()
challenge = challenges.HTTP01(token=token.encode('utf-8'))
response = challenges.HTTP01Response(key_authorization='tra-\N{leo}')
sut.start_responding('domain-ignored', challenge, response)
child = sut.getChildWithDefault(token_base64, request)
result = child.render(request)
self.assertEqual(b'tra-\xe2\x99\x8c', result)
self.assertEqual(http.OK, request.code)
sut.stop_responding('domain-ignored', challenge, 'ignored')
child = sut.getChildWithDefault(token_base64, request)
code = child.headersReceived(request)
self.assertEqual(http.NOT_FOUND, code)
def test_stop_responding_no_start(self):
"""
Does nothing when stop is called without already having a challenge
setup.
"""
token = 'bla-\N{sun}'
token_base64 = b'YmxhLeKYiQ'
request = mk.makeTwistedWebRequest(method=b'GET')
sut = HTTP01ChevahResponder()
challenge = challenges.HTTP01(token=token.encode('utf-8'))
sut.stop_responding('domain-ignored', challenge, 'ignored')
child = sut.getChildWithDefault(token_base64, request)
code = child.headersReceived(request)
self.assertEqual(http.NOT_FOUND, code)
def fail_on_call(domain, pem_objects):
"""
Method used to make sure that on_update is not called.
"""
raise AssertionError('Should not be called.')
class TestCertificateStore(WithProcessTestCase):
"""
Unit tests for CertificateStore.
"""
def getConfiguration(self, content=''):
"""
Return the configuration for Lets encrypt resource.
"""
self.process.reactor = self.clock
resources = self.process.configuration.resources
resource = resources.createProperty('', {
'type': TYPE_NAME.LETS_ENCRYPT,
'acme_url': 'http://test.local:123/directory'
})
resource._storage = content
# Advance the clock to settle configuration updates.
self.clock.advance(1)
# FIXME:5580:
mk.setReWritableBytesIO(self.process.configuration)
return resource
def test_init_empty(self):
"""
Is initialized with configuration and event emitted and can read an
empty cache.
"""
configuration = self.getConfiguration(content='')
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: [],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertEqual({}, result)
def test_init_no_services(self):
"""
It can be initialized even when we don't have any services.
This happens during the main process initialization.
"""
self.process.services = None
configuration = self.getConfiguration(content='')
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: [],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertEqual({}, result)
def test_as_dict_single_domain(self):
"""
It can load the storage for a single domain.
"""
configuration = self.getConfiguration(content=(
'http://test.local:123/directory\n'
' ||| ftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: ['ftp.example.com'],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['ftp.example.com'],
result.keys(),
)
self.assertEqual(2, len(result['ftp.example.com']))
def test_as_dict_multi_domains(self):
"""
It can load the storage for multiple domains.
"""
configuration = self.getConfiguration(content=(
'http://test.local:123/directory\n'
' ||| ftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
' ||| web.domain.tld |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
' ||| empty.domain.tld |||\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: [
'ftp.example.com', 'web.domain.tld', 'empty.domain.tld'],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['ftp.example.com', 'web.domain.tld', 'empty.domain.tld'],
result.keys(),
)
self.assertEqual(3, len(result['ftp.example.com']))
self.assertEqual(1, len(result['web.domain.tld']))
self.assertEqual(0, len(result['empty.domain.tld']))
def test_as_dict_new_acme_url(self):
"""
It will invalidate the cache when the configuration is for a different
ACME server than the stored certs, and will start the "registered"
domains without any certs.
"""
configuration = self.getConfiguration(content=(
'http://NEW.ACME:123/directory\n'
' ||| ftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
' ||| web.domain.tld |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
' ||| empty.domain.tld |||\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: [
'ftp.example.com', 'web.domain.tld', 'empty.domain.tld'],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertEqual({
'empty.domain.tld': [],
'ftp.example.com': [],
'web.domain.tld': [],
}, result)
def test_as_dict_different_domains(self):
"""
It will not load the storage for domains for which we no longer have
a service.
New domains are "registered" without any object.
"""
configuration = self.getConfiguration(content=(
'http://test.local:123/directory\n'
' ||| ftp.example.com , sftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
' ||| web.domain.tld |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
' ||| empty.domain.tld |||\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: [
'ftp.example.com, sftp.example.com',
'other.domain.tld'],
)
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['ftp.example.com, sftp.example.com', 'other.domain.tld'],
result.keys())
self.assertEqual(3, len(result['ftp.example.com, sftp.example.com']))
self.assertEqual(0, len(result['other.domain.tld']))
def test_get_not_found_empty(self):
"""
Return a failure when could not found X.509 objects for a name.
"""
configuration = self.getConfiguration(content='')
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: ['any-name'],
)
deferred = sut.get('any-name')
failure = self.failureResultOf(deferred)
self.assertServerException(
'No certificate for "any-name".',
failure.value,
)
def test_get_not_found(self):
"""
Return a failure when could not found X.509 objects for a name.
"""
configuration = self.getConfiguration(content=(
'http://test.local:123/directory\n'
' ||| ftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: ['ftp.example.com'],
)
deferred = sut.get('any-name')
failure = self.failureResultOf(deferred)
self.assertServerException(
'No certificate for "any-name".',
failure.value,
)
def test_get_found(self):
"""
Return a deferred with the x.509 objects for.
"""
configuration = self.getConfiguration(content=(
'http://test.local:123/directory\n'
' ||| ftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
' ||| http.example.com , www.example.com |||\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
))
sut = CertificateStore(
configuration=configuration,
on_update=fail_on_call,
domains_getter=lambda: ['ftp.example.com'],
)
deferred = sut.get('ftp.example.com')
result = self.successResultOf(deferred)
self.assertEqual(2, len(result))
deferred = sut.get('http.example.com, www.example.com')
result = self.successResultOf(deferred)
self.assertEqual(1, len(result))
def test_store_new(self):
"""
It will add a new entry for a new server name and save it to the
configuration.
"""
updates = []
certs = (
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
)
configuration = self.getConfiguration(content='')
sut = CertificateStore(
configuration=configuration,
on_update=lambda d, o: updates.append((d, o)),
domains_getter=lambda: ['new.example.com'],
)
pem_objects = pem.parse(certs)
sut.store('new.example.com', pem_objects)
deferred = sut.get('new.example.com')
result = self.successResultOf(deferred)
self.assertEqual(2, len(result))
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['new.example.com'],
result.keys(),
)
self.assertEqual((
'http://test.local:123/directory\n' +
'|||new.example.com|||\n' +
certs + '\n\n'
),
configuration._storage)
self.assertEqual([('new.example.com', pem_objects)], updates)
def test_store_multi_domain(self):
"""
It will add a new entry for multi domain certificate and will
normalize the domains comma separated list definition
"""
updates = []
certs = (
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
)
configuration = self.getConfiguration(content='')
sut = CertificateStore(
configuration=configuration,
on_update=lambda d, o: updates.append((d, o)),
domains_getter=lambda: [
'new.example.com, ftp.example.com, example.com'],
)
# We store with not normalized names.
pem_objects = pem.parse(certs)
sut.store(
' new.example.com,ftp.example.com , example.com ', pem_objects)
deferred = sut.get('new.example.com, ftp.example.com, example.com')
result = self.successResultOf(deferred)
self.assertEqual(1, len(result))
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['new.example.com, ftp.example.com, example.com'],
result.keys(),
)
self.assertEqual((
'http://test.local:123/directory\n' +
'|||new.example.com, ftp.example.com, example.com|||\n' +
certs + '\n\n'
),
configuration._storage)
self.assertEqual(
[('new.example.com, ftp.example.com, example.com', pem_objects)],
updates)
def test_store_update(self):
"""
It will replace the cert for an existing domain and save it to the
configuration.
"""
updates = []
existing = (
'http://test.local:123/directory\n'
'|||ftp.example.com, sftp.example.com |||\n'
'-----BEGIN RSA PRIVATE KEY-----\n'
'MII private key here\n'
'-----END RSA PRIVATE KEY-----\n'
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz cert here\n'
'-----END CERTIFICATE-----\n'
' ||| empty.domain.tld |||\n'
)
update = (
'-----BEGIN CERTIFICATE-----\n'
'MIIEqz New CERTS\n'
'-----END CERTIFICATE-----\n'
)
configuration = self.getConfiguration(content=existing)
sut = CertificateStore(
configuration=configuration,
on_update=lambda d, o: updates.append((d, o)),
domains_getter=lambda: [
'ftp.example.com, sftp.example.com', 'empty.domain.tld'],
)
pem_objects = pem.parse(update)
sut.store('ftp.example.com, sftp.example.com', pem_objects)
deferred = sut.get('ftp.example.com, sftp.example.com')
result = self.successResultOf(deferred)
self.assertEqual(1, len(result))
deferred = sut.as_dict()
result = self.successResultOf(deferred)
self.assertItemsEqual(
['ftp.example.com, sftp.example.com', 'empty.domain.tld'],
result.keys(),
)
self.assertEqual((
'http://test.local:123/directory\n' +
'|||ftp.example.com, sftp.example.com|||\n' +
update + '\n\n\n'
'|||empty.domain.tld|||\n\n\n'
),
configuration._storage)
self.assertEqual(
[('ftp.example.com, sftp.example.com', pem_objects)], updates)
class TestLetsEncryptHTTPFactory(WithProcessTestCase):
"""
Unit tests for LetsEncryptHTTPFactory.
"""
def test_init(self):
"""
It is initialized with the resource configuration.
"""
errors = []
configuration = self.process.configuration.resources.createProperty(
'', {'type': TYPE_NAME.LETS_ENCRYPT})
sut = LetsEncryptHTTPFactory(
service_configuration=configuration,
on_error=lambda error: errors.append(error),
)
# Responder is available... so that we can inject responses.
self.assertIsInstance(HTTP01ChevahResponder, sut.responder)
# The responder is available at expected URL.
request = mk.makeTwistedWebRequest(
uri='/.well-known/acme-challenge/test.txt')
test_child = self.successResultOf(sut.getResourceFor(request))
result = test_child.render(request)
self.assertEqual(b'Let\'s Encrypt Ready', result)
self.assertEqual(http.OK, request.code)
# Other pages got page not found.
request = mk.makeTwistedWebRequest([b'other', 'page'])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
code = resource.headersReceived(request)
self.assertEqual(http.NOT_FOUND, code)
# Root page is empty.
request = mk.makeTwistedWebRequest([b''])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertEqual(b'', result)
self.assertEqual(http.OK, request.code)
# Base ACME folder is empty.
request = mk.makeTwistedWebRequest([b'.well-known', 'acme-challenge'])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertEqual(b'', result)
self.assertEqual(http.OK, request.code)
def test_init_redirection(self):
"""
When configured to redirect requests, it will server ACME challenges
but redirect any other request.
"""
errors = []
configuration = self.process.configuration.resources.createProperty(
'', {'type': TYPE_NAME.LETS_ENCRYPT, 'redirect_url': 'http://a.c'})
sut = LetsEncryptHTTPFactory(
service_configuration=configuration,
on_error=lambda error: errors.append(error),
)
# Responder is available... so that we can inject responses.
self.assertIsInstance(HTTP01ChevahResponder, sut.responder)
# The responder is available at expected URL.
request = mk.makeTwistedWebRequest(
uri='/.well-known/acme-challenge/test.txt')
test_child = self.successResultOf(sut.getResourceFor(request))
result = test_child.render(request)
self.assertEqual(b'Let\'s Encrypt Ready', result)
self.assertEqual(http.OK, request.code)
# Other pages are redirected.
request = mk.makeTwistedWebRequest([b'other', 'page'])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertEqual(http.MOVED_PERMANENTLY, request.code)
# Root page is a redirection.
request = mk.makeTwistedWebRequest([b''])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertContains(b'<a href="http://a.c">click here</a>\n', result)
self.assertEqual(http.MOVED_PERMANENTLY, request.code)
# Base ACME folder is empty.
request = mk.makeTwistedWebRequest([b'.well-known', 'acme-challenge'])
deferred = sut.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertEqual(b'', result)
self.assertEqual(http.OK, request.code)
class TestLetsEncryptClientResource(WithProcessTestCase):
"""
Tests for Lets's Encrypt HTTP site which servers as the Let's Encrypt
client service.
"""
DIRECTORY_BODY = {
"keyChange": "http://%(address)s/acme/rollover-account-key",
"meta": {
"caaIdentities": ["letsencrypt.org"],
"termsOfService": "http://%(address)s/tos.txt",
"website": "http://%(address)s/website"
},
"newOrder": "http://%(address)s/acme/new-order",
"newAccount": "http://%(address)s/acme/new-account",
"newNonce": "http://%(address)s/acme/new-nonce",
"revokeCert": "http://%(address)s/acme/revoke-cert",
}
def getResource(
self,
configuration={},
account_key=SSL_DATA['2048_KEY_DATA'],
raw_storage=None,
fake_reactor=True,
):
"""
Return a new LetsEncryptHTTPFactory which is already set for a domain.
By default it has the `localhost` domain with a cert which is already
stored and don't need an update.
"""
self.clock = InMemoryTCPReactor(expected_addresses=[('127.0.0.1', 80)])
self.process.reactor = self.clock
resources = self.process.configuration.resources
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTPS,
'configuration/ssl_domains': ['localhost'],
})
new_configuration = {
'type': TYPE_NAME.LETS_ENCRYPT,
'acme_url': 'http://127.0.0.1/directory',
'address': '127.0.0.1',
'port': 80,
}
new_configuration.update(configuration)
resource = resources.createProperty('', new_configuration)
if raw_storage is None:
raw_storage = (
new_configuration['acme_url'] + '\n' +
'|||localhost|||\n' +
SSL_DATA['SERVER_CERT_DATA']
)
resource._account_key = account_key
resource._storage = raw_storage
# Let the configuration updates to settle.
self.clock.advance(1)
result = LetsEncryptClientResource(
parent=resources,
configuration=resource,
)
if fake_reactor:
result._scheduler = self.clock
return result
def getServerResponses(self):
"""
Return the responses.
"""
directory_response = ResponseDefinition(
url='/directory',
response_content='to-be-updated',
content_type='application/json',
)
nonce_response = ResponseDefinition(
method='HEAD',
url='/acme/new-nonce',
response_headers=[('Replay-Nonce', 'test1111')],
)
# This uses the default key.
registration_response = ResponseDefinition(
method='POST',
url='/acme/new-account',
request=ResponseDefinition.ANY,
response_code=http.CREATED,
response_content=(
'{"status": "valid", "key": {"e": "AQAB", "kty": "RSA", "n": '
'"zLUJYbSpjSAOSpxfns_w111mRls_FrHIC358fCxZsWzVXX_67uzM9TExAKtt'
'y3jrY1EV3C2-JcAIpwLTHhVHQL9ihqMu1Tp82fEoQtyqc68mGJFQP0vXE9I4P'
'OHGknpjH9vkHBzC-6V3FSFL3E6aUcfdqGVOquWiLgnE2PSpV-mZtXGceU2oP7'
'ERQAMpvT4ZPy2Pe9JcBn1KZrLSrzcdoBbY-q0yCsEemtxMXyVB9Y-0_yRCHAB'
'4f0z8ipncdWhiLQcgegCx0Bd6h4jqNSNBmoOIkzT7vMiQsJifmsJ6l6T4uUDN'
'q9lmiGK53pPGZIVyUQeO7QrXQSNBQ7qcWpxKhw"},'
'"agreement": "http://localhost/acme/ter"'
'}'
),
content_type='application/json',
response_headers=[
('Replay-Nonce', 'test2222'),
('Link', '<http://localhost/acme/new-authz>;rel="next"'),
('Link', '<http://localhost/acme/recover-reg>;rel="recover"'),
('Link', '<http://localhost/acme/ter>;rel="terms-of-service"'),
],
)
order_response = ResponseDefinition(
method='POST',
url='/acme/new-order',
request=ResponseDefinition.ANY,
content_type='application/json',
response_code=http.CREATED,
response_content='{"bla": "la"}',
response_headers=[('Replay-Nonce', 'test3333')],
)
return [
directory_response,
nonce_response,
registration_response,
order_response,
]
def updateResponse(self, directory_response, httpd):
"""
Update the response for directory to have addressed on the testing
server.
"""
template = json.dumps(self.DIRECTORY_BODY)
data = {'address': '%s:%s' % (httpd.ip, httpd.port)}
directory_response.updateResponseContent(template % data)
def test_init(self):
"""
It starts a site which has the ACME well-know folder available.
"""
resources = self.process.configuration.resources
configuration = resources.createProperty(
'', {'type': TYPE_NAME.LETS_ENCRYPT})
sut = LetsEncryptClientResource(
parent=resources,
configuration=configuration,
)
# ACME test page.
request = mk.makeTwistedWebRequest(
[b'.well-known', 'acme-challenge', 'test.txt'])
deferred = sut._factory.getResourceFor(request)
resource = self.successResultOf(deferred)
result = resource.render(request)
self.assertEqual(b"Let's Encrypt Ready", result)
self.assertEqual(http.OK, request.code)
def test_start_no_acme_url(self):
"""
It fails to start when no acme_url is defined.
"""
sut = self.getResource(configuration={
'acme_url': '',
})
self.assertRunnableFailsToStart(
sut,
details='Missing URL to the Let\'s Encrypt Server Directory.')
def test_start_not_supported(self):
"""
It fails to start when on an OS which is not supported..
"""
sut = self.getResource()
# On OS without support, the JWS is not imported.
sut._JWSClient = None
self.assertRunnableFailsToStart(
sut,
details='Let\'s Encrypt is not yet supported on this OS.')
def test_start_account_key_fail_load(self):
"""
It fails when the existing account key can't be loaded.
"""
sut = self.getResource(
account_key='invalid data',
)
if self.os_name == 'aix':
# On AIX we don't get all the details.
message = (
'Failed to load the account key. '
'Could not deserialize key data.'
)
else:
message = (
'Failed to load the account key. '
'Could not deserialize key data. '
'The data may be in an incorrect format or '
'it may be encrypted with an unsupported algorithm.'
)
self.assertRunnableFailsToStart(
sut, details=self.Contains(message))
def test_start_fail_to_start_http_server(self):
"""
It fails when it can't start the HTTP server.
"""
sut = self.getResource(configuration={
'address': '1.2.3.4',
'port': 43,
})
# Put back the real reactor,
sut._scheduler = reactor
if self.TEST_LANGUAGE == 'FR':
details = self.Contains(
'Failed to start the local HTTP-01 challenge server. '
'CannotListenError(')
else:
details = self.Contains(
'Failed to start the local HTTP-01 challenge server. '
'Couldn\'t listen on 1.2.3.4:43: [Errno ')
self.assertRunnableFailsToStart(sut, details=details)
def test_start_account_key_not_defined(self):
"""
When no account key is defined, it will automatically create one and
save it.
In this test, client registration fails, but we only care that the
key is created and registration is attempted.
"""
with HTTPServerContext([]) as httpd:
self._check_start_account_key_not_defined(httpd)
def assertFactoryError(self, message):
"""
Check that when running the reactor the factory fails with `on_error`
hook.
"""
self.executeReactor()
self.clock.advance(1)
# The ACME url is validated.
self.assertEvent('40033', reason='HTTP connection made.')
self.assertEventUnordered('40032', reason='HTTP connection close.')
self.assertEvent(
'20158',
data={'details': message},
reason='Resource failed to start.',
)
self.assertEvent('20157', reason='Resource stopped.')
def _check_start_account_key_not_defined(self, http_server):
"""
Low level implementation to execute while ACME server is up.
"""
stream = mk.setReWritableBytesIO(self.process.configuration)
acme_url = 'http://%s:%s/directory' % (
http_server.ip, http_server.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
account_key='',
)
sut._KEY_SIZE = 512
self.assertIsEmpty(sut.configuration._account_key)
sut.start()
# New account is updated and store.
self.assertIsNotEmpty(sut.configuration._account_key)
self.assertContains(
b'_account_key = -----BEGIN RSA PRIVATE KEY',
stream.content_at_close,
)
self.assertFactoryError(
self.Contains('Failed to discover the ACME server.'))
# Let the HTTP client to close.
self.executeReactor()
def test_start_acme_url_not_reachabe(self):
"""
It fails to start when the configured ACME url is not available.
"""
# Hope nobody is listening on this port.
sut = self.getResource(
configuration={'acme_url': 'http://127.0.0.1:4214'},
)
deferred = sut.start()
self.getDeferredResult(deferred)
self.assertEvent(
'20158',
data={
'details': self.Contains(
'Failed to discover the ACME server. '
'Connection was refused')
},
reason='Resource failed to start.',
)
self.assertEvent('20157', reason='Resource stopped.')
def test_start_acme_url_not_directory(self):
"""
It fails to start when the configured ACME url available but is not
an ACME directory.
"""
directory_response = ResponseDefinition(
url='/directory',
response_content='not-json',
)
with HTTPServerContext([directory_response]) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
sut.start()
self.assertFactoryError(
'Failed to discover the ACME server. '
"Unexpected response Content-Type: 'text/html'. "
"Expecting 'application/json'."
)
def test_start_acme_url_directory_bad_json(self):
"""
It fails to start when the configured ACME url available but is not
an ACME directory.
"""
directory_response = ResponseDefinition(
url='/directory',
response_content='not-json',
content_type='application/json',
)
with HTTPServerContext([directory_response]) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
sut.start()
self.assertFactoryError(
'Failed to discover the ACME server. '
'Missing JSON body.')
def test_start_acme_url_empty_directory(self):
"""
It fails to start when the configured ACME has a directory in JSON,
but not all fields are found.
"""
directory_response = ResponseDefinition(
url='/directory',
response_content='{}',
content_type='application/json',
)
with HTTPServerContext([directory_response]) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
sut.start()
self.assertFactoryError(
'Failed to discover the ACME server. '
'Directory has no newNonce URL')
def test_start_register_bad_response(self):
"""
It fails to start when the configured ACME has a valid directory
but client registration fails.
"""
responses = self.getServerResponses()
responses = responses[:3]
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
responses[2].response_headers = {}
sut.start()
self.assertFactoryError(self.Contains(
'Failed to register the ACME server. '
'Server POST response did not include a replay nonce'
))
def test_getCertificatePEMs_stoped_no_cache(self):
"""
Will return the placeholder cert and does nothing else when
the Let's Encrypt resource is stopped.
"""
sut = self.getResource()
result = sut.getCertificatePEMs('any.host.com')
self.assertEqual(_PEMS_PLACEHOLDER, result)
def test_getCertificatePEMs_stoped_with_cache(self):
"""
Will return the cached cert and does nothing else when
the Let's Encrypt resource is stopped.
"""
existing_storage = (
'http://localhost.acme:12/directory\n' +
'|||localhost|||\n' +
SSL_DATA['SERVER_CERT_DATA']
)
sut = self.getResource(
configuration={'acme_url': 'http://localhost.acme:12/directory'},
raw_storage=existing_storage
)
result = sut.getCertificatePEMs('localhost')
self.assertEqual(SSL_DATA['SERVER_CERT_DATA'].strip(), result)
def assertCleanStop(self, sut):
"""
Check that the service is stopped without any errors and without
extra actions.
"""
# It stops clean.
deferred = sut.stop()
self.getDeferredResult(deferred)
# The connection is only closed in the end as a single connection
# is used for all the requests.
self.assertEvent('40032', reason='HTTP connection close.')
self.assertEvent('20157', reason='Resource stoped.')
def checkStartWithoutAnyCheck(self, sut):
"""
Check that starting the resource does not trigger any new
certificate generation.
"""
deferred = sut.start()
self.getResult(deferred)
# The ACME url is validated.
self.assertEvent('40033', reason='HTTP connection made.')
self.assertEvent('20156', reason='Resource started.')
# No check is running.
self.getResult(sut._acme_service._ongoing_check)
self.assertEventsQueueIsEmpty()
def test_getCertificatePEMs_started_no_cache_at_start(self):
"""
When the service exists then resource start but no cert is cached,
it will trigger a certificate and return a placeholder.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTPS,
'configuration/ssl_domains': ['any.host.com'],
})
self.clock.advance(1)
deferred = sut.start()
self.getResult(deferred)
# The ACME url is validated.
self.assertEvent('40033', reason='HTTP connection made.')
self.assertEvent('20156', reason='Resource started.')
# Let the first check to try to get the configured cert.
self.getResult(sut._acme_service._ongoing_check)
self.assertEvent(
'20017',
data={
'domains': 'any.host.com',
},
reason='Failed to get cert from ACME.',
)
# Getting the certificate returns the placeholder.
result = sut.getCertificatePEMs('any.host.com')
self.assertEqual(_PEMS_PLACEHOLDER, result)
self.assertCleanStop(sut)
def test_getCertificatePEMs_started_no_cache_after_start(self):
"""
When the service is started and certificate for a new service
is requested, a placeholder is returned and a new certificate
creation is triggered.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
self.checkStartWithoutAnyCheck(sut)
# A new service is created after the resource is started.
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTPS,
'configuration/ssl_domains': ['any.host.com'],
})
self.clock.advance(1)
# Getting the certificate returns the placeholder.
result = sut.getCertificatePEMs('any.host.com')
self.assertEqual(_PEMS_PLACEHOLDER, result)
self.getResult(sut._ongoing_new_cert)
self.assertEvent(
'20017',
data={
'domains': 'any.host.com',
},
reason='Failed to get cert from ACME.',
)
self.assertCleanStop(sut)
def test_getCertificatePEMs_started_with_cache(self):
"""
Will return the cached cert and does nothing else when
the Let's Encrypt resource is started, as the cert is ok.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
# We have a service which is no SSL.
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTP,
})
# We have a service which is SSL, but has no let's encrypt.
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTPS,
})
self.checkStartWithoutAnyCheck(sut)
result = sut.getCertificatePEMs('localhost')
self.assertEqual(SSL_DATA['SERVER_CERT_DATA'].strip(), result)
self.assertCleanStop(sut)
def test_start_register_ok(self):
"""
Once registration succeeds and no certs need to be updated nothing is
done.
A new check is done for the next day.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
self.checkStartWithoutAnyCheck(sut)
# Reset the certificates for localhost to trigger a cert update
# on next check, without saving the file.
sut._acme_service.cert_store._storage['localhost'] = []
self.clock.advance(
sut._acme_service.check_interval.total_seconds() + 1)
self.getResult(sut._acme_service._ongoing_check)
self.assertEvent(
'20017',
data={
'domains': 'localhost',
},
reason='Failed to get cert from ACME.',
)
self.assertCleanStop(sut)
def test_start_new_acme_url(self):
"""
If the existing configuration has a different ACME URL,
the cache is invalidated and a new certificate check is triggered.
"""
# The cache has a valid cert for localhost, but is marked
# as being issued by http://other.acme.com/directory
existing_storage = (
'http://other.acme.com/directory\n' +
'|||localhost|||\n' +
SSL_DATA['SERVER_CERT_DATA']
)
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
raw_storage=existing_storage
)
self.updateResponse(responses[0], httpd)
deferred = sut.start()
self.getResult(deferred)
# The ACME url is validated.
self.assertEvent('40033', reason='HTTP connection made.')
self.assertEvent('20156', reason='Resource started.')
# A new certificate check is scheduled without waiting for next
# day.
# The renewal fails, but we only want to make sure that the
# check is running.
self.getResult(sut._acme_service._ongoing_check)
self.assertEvent(
'20017',
data={
'domains': 'localhost',
},
reason='Failed to get cert from ACME.',
)
self.assertCleanStop(sut)
@attr('slow')
def test_start_functional_staging_ok(self):
"""
It can start with the staging public Let's encrypt server and
provides an HTTP server for ACME HTTP-01 challenge.
"""
acme_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
sut = self.getResource(
configuration={
'acme_url': acme_url,
'port': 3245,
},
fake_reactor=False,
)
deferred = sut.start()
# This will hit the internet...so it can be slow.
self.getResult(deferred, timeout=10)
# The ACME url is validated.
self.assertEvent('40033', reason='HTTP connection made.')
self.assertEvent('20156', reason='Resource started.')
agent = Agent(reactor)
d = agent.request(
b'GET',
b'http://127.0.0.1:3245/.well-known/acme-challenge/test.txt',
None)
result = self.getResult(d)
self.assertEqual(200, result.code)
self.assertEqual([b'Let\'s Encrypt Ready'], result._bodyBuffer)
self.assertCleanStop(sut)
# Wait for HTTPS SSL to close.
self.executeReactor(timeout=2)
@attr('slow')
def test_start_functional_prod_v1_to_v2(self):
"""
It can start with a V1 PROD directory and and will automatically
upgrade to v2 URL.
"""
if not self.os_version.startswith('ubuntu'):
raise self.skipTest('Reduce usage of ACME PROD server.')
acme_url = 'https://ACME-V1.api.letsencrypt.org/directory'
sut = self.getResource(
configuration={
'acme_url': acme_url,
'contact_email': 'admin@chevah.com',
'port': 3245,
},
fake_reactor=False,
)
deferred = sut.start()
# This will hit the Internet...so it can be slow.
self.getResult(deferred, timeout=10)
# The ACME url is validated.
self.assertEvent(
'40033',
# FIXME:5626:
# See how to preserve the 'acme-v02.api.letsencrypt.org' hostname.
# Or have both hostname and actual IP
data={'hostname': '172.65.32.248'},
reason='HTTP connection made.',
)
self.assertEvent('20156', reason='Resource started.')
self.assertCleanStop(sut)
# Wait for HTTPS SSL to close.
self.executeReactor()
def test_update_service_not_started(self):
"""
When a new certificate is produced for a service and the service
is stopped, it will just emit an event that the new certificate was
create and will not start the service.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
self.checkStartWithoutAnyCheck(sut)
self.process.configuration.services.createProperty('', {
'type': TYPE_NAME.HTTPS,
'configuration/ssl_domains': ['other.domain.com'],
})
self.clock.advance(1)
sut._onUpdate('localhost', [])
self.assertEvent(
'20016',
data={
'domains': 'localhost',
},
reason='New certificate generated.',
)
self.assertCleanStop(sut)
def test_update_service_started(self):
"""
When a new certificate is produced for a service and the service
is started, it will emit an event that the new certificate was
create and will restart the service.
"""
responses = self.getServerResponses()
with HTTPServerContext(responses) as httpd:
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port)
sut = self.getResource(
configuration={'acme_url': acme_url},
)
self.updateResponse(responses[0], httpd)
self.checkStartWithoutAnyCheck(sut)
# Create a service which is actually running.
# We create after the let's encrypt resource is start to not
# trigger a new certificate generation when the resource starts.
new_config = self.process.configuration.services.createProperty(
'',
{
'type': TYPE_NAME.HTTPS,
'port': mk.portNumber(),
'name': 'new-https-service',
'configuration/ssl_domains': ['new.domain'],
}
)
self.clock.advance(1)
new_service = self.process.services.getSection(new_config.uuid)
deferred = new_service.start()
self.getResult(deferred)
self.assertEvent('20156', reason='Service started.')
self.assertEventsQueueIsEmpty()
sut._onUpdate('new.domain', [])
self.assertEvent(
'20016',
data={
'domains': 'new.domain',
},
reason='New certificate generated.',
)
self.iterateReactor()
self.assertEvent('20157', reason='Service stopped.')
self.assertEvent('20156', reason='Service started.')
self.getResult(new_service.stop())
self.assertEvent('20157', reason='Service stopped.')
self.assertCleanStop(sut)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment