Skip to content

Instantly share code, notes, and snippets.

@mildsunrise
Last active September 19, 2023 09:11
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mildsunrise/adb4068650484d9fe354a3ee4238eed3 to your computer and use it in GitHub Desktop.
Save mildsunrise/adb4068650484d9fe354a3ee4238eed3 to your computer and use it in GitHub Desktop.
πŸ” Open source implementation of FNMT's certificate configurator v1.0.1 (https://twitter.com/mild_sunrise/status/1585611873860440067)
#!/usr/bin/env python3
'''
Open source implementation of FNMT's certificate configurator v1.0.1
<https://www.sede.fnmt.gob.es/descargas/descarga-software/instalacion-software-generacion-de-claves>
No warranty provided; use this ONLY if you know what you're doing.
Usage: ./fnmt_handle.py <fnmtcr URL>
Fulfills the request indicated by the URL, sending request to answer operation as completed if there are no errors.
For the fnmtcr://request phase, the generated private key is written to "privkey.pem" in current directory.
For the fnmtcr://install phase, the received PKCS#7 / X.509 blob is written to "cert.der" in current directory.
After both 'request' and 'install' are done, you can build a PKCS#12 store from them:
$ openssl pkcs7 -print_certs -inform der -in cert.der -out certs.cer
$ openssl pkcs12 -export -in certs.cer -inkey privkey.pem -out certificate.p12
'''
import sys
import gzip
import OpenSSL.crypto
from cryptography.hazmat.primitives import serialization, ciphers, asymmetric
from urllib.parse import urlparse, parse_qs, urlencode, unquote
from urllib.request import urlopen, Request
from base64 import b64decode, urlsafe_b64encode, urlsafe_b64decode
from xml.etree import ElementTree
# MAIN CODE
def __main__():
args = sys.argv[1:]
if len(args) != 1:
print(__doc__.strip(), file=sys.stderr)
exit(2)
url, = args
url = urlparse(url)
assert url.scheme == 'fnmtcr', f'Unexpected URL schema: {repr(url.scheme)}'
handlers = { 'request': do_request, 'install': do_install }
assert url.netloc in handlers, f'Unimplemented operation {repr(url.netloc)}, try with official configurator'
handler = handlers[url.netloc]
assert url.path == '', f'Unexpected URL path: {repr(url.path)}'
params = ensure_unique(parse_qs(url.query))
if 'rtservlet' in params:
retrieve_parameters(params)
common_params = {}
for k in { 'rtservlet', 'stservlet', 'key', 'fileid' }:
if k in params:
common_params[k] = params.pop(k)
answer_data = handler(params)
send_data(answer_data, common_params)
# HANDLERS
def do_request(params: dict[str, str]) -> bytes:
csrtype = params.pop('csrtype', 'spkac').lower()
keytype = params.pop('keytype', 'rsa').lower()
keylength = int(params.pop('keylength', '2048'))
forcecard = params.pop('forcecard', None)
assert keytype == 'rsa' and csrtype in { 'spkac', 'mozilla', 'firefox' } and forcecard != 'true' and not params, \
'Unimplemented "request" parameters, try with official configurator'
key = asymmetric.rsa.generate_private_key(65537, keylength)
with open('privkey.pem', 'xb') as f:
f.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
# cryptography doesn't implement SPKAC. PyOpenSSL does, but you can't set the challenge. the openssl tool does, but only allows specifying digest on v3+. sigh.
dangerous_hacky_set_spki_challenge = lambda spki, challenge: \
OpenSSL.crypto._lib.ASN1_STRING_set(OpenSSL.crypto._ffi.cast('ASN1_IA5STRING ***', spki._spki)[0][1], challenge.encode(), -1)
spkac = OpenSSL.crypto.NetscapeSPKI()
dangerous_hacky_set_spki_challenge(spkac, '16074851')
ossl_key = OpenSSL.crypto.PKey.from_cryptography_key(key)
spkac.set_pubkey(ossl_key)
spkac.sign(ossl_key, 'sha256')
return b64decode(spkac.b64_encode())
def do_install(params: dict[str, str]) -> bytes:
assert 'cert' in params, '"cert" parameter not present, I do not know what to do'
cert = params.pop('cert')
if params:
print(f'warning: Unknown parameters: {params}')
cert = urlsafe_b64decode(cert)
try:
cert = gzip.decompress(cert)
except gzip.BadGzipFile:
print('note: Invalid GZIP, assuming not compressed.')
with open('cert.der', 'xb') as f:
f.write(cert)
return b'OK'
# SERVER COMMUNICATION
def retrieve_parameters(params: dict[str, str]):
retrieval_servlet_url = params['rtservlet']
print('requesting additional parameters to:', repr(retrieval_servlet_url))
body = { 'op': 'get', 'v': '1', 'id': params['fileid'] }
with urlopen(Request(validate_url(retrieval_servlet_url + '?' + urlencode(body)), method='GET')) as resp:
body = resp.read().decode()
assert not body.upper().startswith('ERR'), f'error response from server: {body}'
body = decipher_data(body, params['key'].encode()).decode() if params.get('key') else body
for node in ElementTree.fromstring(body):
assert node.tag == 'e' and set(node.attrib) == {'k', 'v'} and not list(node), f'unexpected node {node}'
params[node.attrib['k']] = unquote(node.attrib['v'])
def send_data(data: bytes, params: dict[str, str]):
storage_servlet_url = params['stservlet']
print('submitting completion request to:', repr(storage_servlet_url))
data = cipher_data(data, params.get('key', '').encode())
body = { 'op': 'put', 'v': '1_0', 'id': params['fileid'], 'dat': data }
with urlopen(Request(validate_url(storage_servlet_url), urlencode(body).encode('ascii'), method='POST')) as resp:
body = resp.read()
print(f'server response: status={resp.code}, body={body}')
assert body.strip() == b'OK', 'unexpected response from server'
CIPHER_ALG = ciphers.algorithms.TripleDES
def cipher_data(data: bytes, key: bytes) -> str:
if not key:
return urlsafe_b64encode(data).decode('ascii')
assert len(key) == 8, 'invalid key length'
padding_len = (-len(data)) % (CIPHER_ALG.block_size // 8)
data += b'\0' * padding_len
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).encryptor()
data = cipher.update(data) + cipher.finalize()
return f'{padding_len}.' + urlsafe_b64encode(data).decode('ascii')
def decipher_data(data: str, key: bytes) -> bytes:
if not key:
return urlsafe_b64decode(data)
assert len(key) == 8, 'invalid key length'
if not ( (idx := data.find('.')) != -1 and (padding_len := data[:idx]).isdigit() and (padding_len := int(padding_len)) < 8 ):
raise AssertionError('invalid encrypted data')
data = urlsafe_b64decode(data[idx + 1:])
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).decryptor()
data = cipher.update(data) + cipher.finalize()
return data[:len(data) - padding_len]
# OTHER
def ensure_unique(params: dict[str, list[str]]) -> dict[str, str]:
new_params = {}
for k, v in params.items():
assert len(v) == 1, f'Unexpected duplicate parameter {repr(k)}'
new_params[k] = v[0]
return new_params
def validate_url(url: str) -> str:
parsed = urlparse(url)
allowed_suffixes = [ '.fnmt.es', '.fnmt.gob.es' ]
if not (parsed.scheme.lower() == 'https' and any(parsed.hostname.lower().endswith(suffix) for suffix in allowed_suffixes)):
answer = input(f'allow request to {repr(parsed.hostname)}? [y/n]: ')
assert answer == 'y', 'user rejected request'
return url
if __name__ == '__main__': __main__()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment