Skip to content

Instantly share code, notes, and snippets.

@zengxs
Last active July 5, 2017 02:35
Show Gist options
  • Save zengxs/d90c6d403365992e855cd15cddee5800 to your computer and use it in GitHub Desktop.
Save zengxs/d90c6d403365992e855cd15cddee5800 to your computer and use it in GitHub Desktop.
Let's Encrypt Acme Client Protocol Implementation
import base64
import copy
import hashlib
import json
import logging
import time
from collections import namedtuple
import requests
from jwt.algorithms import get_default_algorithms
from jwt.utils import base64url_encode, base64url_decode
from jwt.utils import force_bytes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec
logging.basicConfig(level=logging.DEBUG)
LE_SERVER = 'https://acme-staging.api.letsencrypt.org/directory'
def to_json(data):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
class AcmeError(IOError):
def __init__(self, response):
msg = "The ACME request failed."
try:
details = response.json()
msg = '{} (Type: {}, HTTP {})'.format(
details.get('detail'),
details.get('type') or 'unknown', response.reason)
except (ValueError, TypeError, AttributeError):
pass
super().__init__(msg)
class Account:
def __init__(self, jwk, uri=None):
self.jwk = jwk
self.uri = uri
self.jwk_thumbprint = self.generate_jwk_thumbprint()
def generate_jwk_thumbprint(self):
# sha256sum
sha = hashes.Hash(hashes.SHA256(), default_backend())
sha.update(force_bytes(to_json(self.jwk)))
return base64url_encode(sha.finalize()).decode('ascii')
class Acme:
def __init__(self, url, account, priv_key):
self.url = url
self.accout = account
self.jwk = account.jwk
self.key = priv_key
self._get_nonce()
def _get_nonce(self):
r = requests.get(self.url)
self.nonce = r.headers.get('Replay-Nonce')
self.dir = r.json()
return self.nonce
def _generate_header(self):
if self.jwk.get('kty') == 'EC':
algorithm = 'ES' + self.jwk.get('crv').strip('P-')
elif self.jwk.get('kty') == 'RSA':
algorithm = 'RS256'
else:
raise ValueError('Invalid JWK key type: {}.' % self.jwk.get('kty'))
header = {'alg': algorithm, 'jwk': self.jwk}
protected = copy.deepcopy(header)
protected['nonce'] = self.nonce
return header, protected
def _sign_request(self, header, protected, payload, algorithm):
protected = base64url_encode(force_bytes(to_json(protected)))
payload = base64url_encode(force_bytes(to_json(payload)))
try:
algorithm_obj = get_default_algorithms()[algorithm]
signing_input = b'.'.join([protected, payload])
key = algorithm_obj.prepare_key(self.key)
signature = algorithm_obj.sign(signing_input, key)
except KeyError:
raise NotImplementedError('Algorithm not supported')
return dict(
header=header,
protected=protected.decode('ascii'),
payload=payload.decode('ascii'),
signature=base64url_encode(signature).decode('ascii'))
def _post(self, url, payload):
headers = {'Content-Type': 'application/json'}
header, protected = self._generate_header()
dat = self._sign_request(header, protected, payload, header.get('alg'))
response = requests.post(url, json=dat, headers=headers)
self.nonce = response.headers.get('Replay-Nonce')
return response
def get_authorization(self, uri):
r = requests.get(uri)
try:
return r.json()
except (ValueError, TypeError, AttributeError) as e:
raise AcmeError(e)
def new_authorization(self, domain):
r = self._post(self.dir.get('new-authz'), {
'resource': 'new-authz',
'identifier': {'type': 'dns','value': domain}
})
if r.status_code == 201: # Created
return NewAuthorizationResult(r.json(), r.headers.get('Location'))
raise AcmeError(response)
def validate_authorization(self, uri, validate_type, key_authorizatioin):
response = self._post(uri, {
'resource': 'challenge',
'type': validate_type,
'keyAuthorization': key_authorizatioin,
})
if str(response.status_code).startswith('2'):
return True
raise AcmeError(response)
# namedtuples
NewAuthorizationResult = namedtuple('NewAuthorizationResult', 'contents uri')
def get_challenge(auth, method):
"""获取指定类型的 challenge"""
try:
return [x for x in auth.contents.get('challenges') or [] \
if x.get('type') == method][0]
except IndexError:
# raise
pass
def main():
# test authorize
validate_type = 'dns-01'
domain = u'le-01.hachiman.gq'
with open('account-test.key', 'rb') as key, open('account-test.json') as jf:
info = json.loads(jf.read())
pkey = serialization.load_pem_private_key( \
key.read(), None, default_backend())
account = Account(info['contents']['key'], uri=info['uri'])
acme = Acme(LE_SERVER, account, pkey)
created = acme.new_authorization(domain)
if created.contents.get('status') == 'valid':
logging.info('{} is already authorized until {}.'.format(
domain, created.contents.get('expires') or '(unknown)'))
exit(0)
challenge = get_challenge(created, validate_type)
key_auth = '{}.{}'.format(challenge.get('token'), account.jwk_thumbprint)
digest = hashlib.sha256()
digest.update(key_auth.encode('ascii'))
txt_record = base64url_encode(digest.digest()).decode('ascii')
logging.info('_acme-challenge.{}. IN TXT \"{}\"'.format(domain, txt_record))
input('Please Press Enter Key...')
acme.validate_authorization(challenge['uri'], validate_type, key_auth)
while True:
logging.info('{}: waiting for verification. Checking in 5 seconds.'.format(domain))
time.sleep(5)
response = acme.get_authorization(challenge['uri'])
status = response.get('status')
if status == 'valid':
logging.info('{}: OK! Authorization lasts util {}.'.format(domain, response.get('expires') or '(not provided)'))
break
if __name__ == '__main__':
main()
import base64
import copy
import hashlib
import json
import logging
import time
from collections import namedtuple
import requests
from jwt.algorithms import get_default_algorithms
from jwt.utils import base64url_encode, base64url_decode
from jwt.utils import force_bytes
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec
logging.basicConfig(level=logging.DEBUG)
LE_SERVER = 'https://acme-staging.api.letsencrypt.org/directory'
def to_json(data):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
class AcmeError(IOError):
def __init__(self, response):
msg = "The ACME request failed."
try:
details = response.json()
msg = '{} (Type: {}, HTTP {})'.format(
details.get('detail'),
details.get('type') or 'unknown', response.reason)
except (ValueError, TypeError, AttributeError):
pass
super().__init__(msg)
class Account:
def __init__(self, jwk, uri=None):
self.jwk = jwk
self.uri = uri
self.jwk_thumbprint = self.generate_jwk_thumbprint()
def generate_jwk_thumbprint(self):
# sha256sum
sha = hashes.Hash(hashes.SHA256(), default_backend())
sha.update(force_bytes(to_json(self.jwk)))
return base64url_encode(sha.finalize()).decode('ascii')
class Acme:
def __init__(self, url, account, priv_key):
self.url = url
self.accout = account
self.jwk = account.jwk
self.key = priv_key
self._get_nonce()
def _get_nonce(self):
r = requests.get(self.url)
self.nonce = r.headers.get('Replay-Nonce')
self.dir = r.json()
return self.nonce
def _generate_header(self):
if self.jwk.get('kty') == 'EC':
algorithm = 'ES' + self.jwk.get('crv').strip('P-')
elif self.jwk.get('kty') == 'RSA':
algorithm = 'RS256'
else:
raise ValueError('Invalid JWK key type: {}.' % self.jwk.get('kty'))
header = {'alg': algorithm, 'jwk': self.jwk}
protected = copy.deepcopy(header)
protected['nonce'] = self.nonce
return header, protected
def _sign_request(self, header, protected, payload, algorithm):
protected = base64url_encode(force_bytes(to_json(protected)))
payload = base64url_encode(force_bytes(to_json(payload)))
try:
algorithm_obj = get_default_algorithms()[algorithm]
signing_input = b'.'.join([protected, payload])
key = algorithm_obj.prepare_key(self.key)
signature = algorithm_obj.sign(signing_input, key)
except KeyError:
raise NotImplementedError('Algorithm not supported')
return dict(
header=header,
protected=protected.decode('ascii'),
payload=payload.decode('ascii'),
signature=base64url_encode(signature).decode('ascii'))
def _post(self, url, payload, headers={}):
http_headers = {'Content-Type': 'application/json'}
http_headers.update(headers)
header, protected = self._generate_header()
dat = self._sign_request(header, protected, payload, header.get('alg'))
response = requests.post(url, json=dat, headers=http_headers)
self.nonce = response.headers.get('Replay-Nonce')
return response
def get_authorization(self, uri):
r = requests.get(uri)
try:
return r.json()
except (ValueError, TypeError, AttributeError) as e:
raise AcmeError(e)
def new_authorization(self, domain):
r = self._post(self.dir.get('new-authz'), {
'resource': 'new-authz',
'identifier': {'type': 'dns','value': domain}
})
if r.status_code == 201: # Created
return NewAuthorizationResult(r.json(), r.headers.get('Location'))
raise AcmeError(response)
def validate_authorization(self, uri, validate_type, key_authorizatioin):
response = self._post(uri, {
'resource': 'challenge',
'type': validate_type,
'keyAuthorization': key_authorizatioin,
})
if str(response.status_code).startswith('2'):
return True
raise AcmeError(response)
def issue_certificate(self, csr):
http_headers = {'Accept': 'application/pkix-cert'}
response = self._post(self.dir['new-cert'], {
'resource': 'new-cert',
'csr': base64url_encode(csr.public_bytes(
serialization.Encoding.DER)).decode('ascii'),
}, headers=http_headers)
if response.status_code == 201: # Created
# der to pem
cer = x509.load_der_x509_certificate(response.content, \
default_backend()).public_bytes(serialization.Encoding.PEM)
inter = response.links.get('up') # 中间证书
if inter :
inter = requests.get(inter['url'], headers=http_headers).content
chain = x509.load_der_x509_certificate(inter, \
default_backend()).public_bytes(serialization.Encoding.PEM)
return IssuanceResult(cer, response.headers.get('Location'), chain)
raise AcmeError(response)
# namedtuples
NewAuthorizationResult = namedtuple('NewAuthorizationResult', 'contents uri')
IssuanceResult = namedtuple('IssuanceResult', 'certificate location intermediate')
def get_challenge(auth, method):
"""获取指定类型的 challenge"""
try:
return [x for x in auth.contents.get('challenges') or [] \
if x.get('type') == method][0]
except IndexError:
# raise
pass
def main():
# test issue
with open('account-test.key', 'rb') as key, open('account-test.json') as jf:
info = json.loads(jf.read())
pkey = serialization.load_pem_private_key( \
key.read(), None, default_backend())
account = Account(info['contents']['key'], uri=info['uri'])
acme = Acme(LE_SERVER, account, pkey)
with open('cert-test.csr', 'rb') as csrf:
csr = x509.load_pem_x509_csr(csrf.read(), default_backend())
logging.info('Requesting certificate issuance...')
result = acme.issue_certificate(csr)
logging.info('Certificate issued.')
with open('cert-test.crt', 'wb') as cer:
cer.write(result.certificate)
cer.write(result.intermediate)
cer.write(force_bytes(result.location))
logging.info(result.intermediate)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import json
import copy
import base64
# requests
import requests
# PyJWT
from jwt.algorithms import get_default_algorithms
from jwt.utils import base64url_encode, to_base64url_uint, force_bytes
# cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
# Let's Encrypt Staging Server (for test, not issue valid certificate)
LE_SERVER = 'https://acme-staging.api.letsencrypt.org/directory'
# generate a new EC key (account key)
pkey = ec.generate_private_key(curve=ec.SECP384R1(), backend=default_backend())
# get nonce & acme directory
r = requests.get(LE_SERVER)
nonce = r.headers['Replay-Nonce']
directory = r.json()
header = {
# let's encrypt server only support curve 'P-256' and 'P-384'
# curve 'P-256' must be signed with 'ES256'
# curve 'P-384' must be signed with 'ES384'
"alg": "ES384",
"jwk": {
"kty": "EC",
"crv": 'P-384',
"x": to_base64url_uint(pkey.public_key().public_numbers().x).decode('ascii'),
"y": to_base64url_uint(pkey.public_key().public_numbers().y).decode('ascii'),
}
}
protected = copy.deepcopy(header)
protected['nonce'] = nonce
payload = {'resource':'new-reg', 'contact':['mailto:me@example.com']}
protected = base64url_encode(force_bytes(json.dumps(protected)))
payload = base64url_encode(force_bytes(json.dumps(payload)))
def sign(input, key, algorithm='ES256'):
"""JWS Sign"""
try:
alg_obj = get_default_algorithms()[algorithm]
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(input, key)
except KeyError:
raise NotImplementedError('Algorithm not supported')
return signature
signing_input = b'.'.join([protected, payload])
signature = base64url_encode(sign(signing_input, pkey, algorithm='ES384'))
data = {
'header': header,
'protected': protected.decode('ascii'),
'payload': payload.decode('ascii'),
'signature': signature.decode('ascii'),
}
r = requests.post(directory['new-reg'], json=data)
# if response '201 Created', register success
print('Status: %d %s' % (r.status_code, r.reason))
print(json.dumps(r.json(), indent=4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment