Created
December 17, 2019 22:46
-
-
Save dbazile/78d97ad0995a3df4af742f2b1ec1fe2f to your computer and use it in GitHub Desktop.
scripting openssl to generate certificate chains easily
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime | |
import getpass | |
import logging | |
import os | |
import re | |
import subprocess | |
import sys | |
import textwrap | |
import time | |
import dateutil.parser | |
import dateutil.tz | |
PATTERN_IP = re.compile(r'^\d+\.\d+\.\d+\.\d+$') | |
PATTERN_DAYS = re.compile(r'^\d+\s*(d|days)$', re.I) | |
PATTERN_SUBJ = re.compile(r'^(/[^=]+=[^=]+)+$') | |
LOG: logging.Logger = logging.getLogger('certgen') | |
def main(): | |
ap = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
ap.add_argument('--debug', action='store_true') | |
ap.add_argument('--ca-cert', help='Path to CA certificate file') | |
ap.add_argument('--ca-key', help='Path to CA key file') | |
ap.add_argument('--cert', default='new.crt', help='Path to issued certificate') | |
ap.add_argument('--key', default='new.key', help='Path to issued certificate key') | |
ap.add_argument('--expires', default='30d', help='Timestamp when this certficiate expires. If ends in "d", will be interpreted as number of days from now.') | |
ap.add_argument('--size', default=4096, type=int, help='Number of bits to use for key') | |
ap.add_argument('--is-ca', action='store_true', help='If set, issued certificate will be marked as a CA') | |
ap.add_argument('--pathlen', default=0, type=int, help='Number of intermediate CAs that can exist between this CA and issued certificates') | |
ap.add_argument('--force', action='store_true', help='Overwrites existing files without prompting') | |
ap.add_argument('subject', help='Subject name for issued certificate') | |
ap.add_argument('san', nargs='*', help='Subject alternative name(s) for issued certificate') | |
params = ap.parse_args() | |
logging.basicConfig( | |
datefmt='%H:%M:%S', | |
format='%(asctime)s [%(levelname)-5s] %(name)s: %(funcName)s: %(message)s', | |
level='DEBUG' if params.debug else 'INFO', | |
stream=sys.stdout, | |
) | |
if LOG.isEnabledFor(logging.DEBUG): | |
LOG.debug('params:\n\033[34m%s\033[0m', | |
'\n'.join([f' {s:>16} : {getattr(params, s)!r}' for s in sorted(dir(params)) if not s.startswith('_')])) | |
try: | |
cert, key = issue( | |
subj=params.subject, | |
cert=params.cert, | |
key=params.key, | |
ttl=_parse_ttl(params.expires), | |
size=params.size, | |
sans=params.san, | |
is_ca=params.is_ca, | |
pathlen=params.pathlen, | |
ca_cert=params.ca_cert, | |
ca_key=params.ca_key, | |
force=params.force, | |
) | |
LOG.info('\033[1m%s\033[0m', cert) | |
LOG.info('\033[1m%s\033[0m', key) | |
except Error as e: | |
if e.details: | |
LOG.error('%s\n\033[31m%s\033[0m', e, e.details) | |
else: | |
LOG.error('%s', e) | |
LOG.debug('stack trace', exc_info=e) | |
return 1 | |
return 0 | |
def issue(cert, key, subj, ttl, size, sans=None, is_ca=False, pathlen=0, ca_cert=None, ca_key=None, force=False): | |
if not cert: | |
raise Error('cert is required') | |
if not key: | |
raise Error('key is required') | |
if os.path.isfile(cert) and os.path.isfile(key) and not force: | |
raise Error(f'cert {cert!r} and key {key!r} already exist') | |
subj = _parse_subj(subj) | |
password = _read_password('Enter password for new certificate: ', 'CERT_PASSWORD') | |
if not ca_cert and not ca_key: | |
return _issue_ca(cert, key, password, subj, size, ttl, pathlen) | |
else: | |
ca_password = _read_ca_password(ca_key) | |
return _issue_signed(cert, key, password, subj, ttl, size, sans, is_ca, pathlen, ca_cert, ca_key, ca_password) | |
def _cleanup(*paths): | |
for path in paths: | |
LOG.debug('delete: %s', path) | |
try: | |
os.unlink(path) | |
except OSError as e: | |
LOG.error('delete failed: %s', e) | |
LOG.debug('stack trace', exc_info=e) | |
def _issue_ca(cert, key, password, subj, size, ttl, pathlen): | |
config = textwrap.dedent( | |
f''' | |
[ req_ext ] | |
authorityKeyIdentifier = keyid,issuer:always | |
basicConstraints = critical, CA:TRUE, pathlen:{pathlen} | |
keyUsage = critical, digitalSignature, cRLSign, keyCertSign | |
subjectKeyIdentifier = hash | |
[ req ] | |
distinguished_name = dn | |
[ dn ] | |
commonName = CN | |
''' | |
) | |
config_path = cert + '.config' | |
LOG.debug('write config: path=%s\n\033[34m%s\033[0m', config_path, config) | |
with open(config_path, 'w') as f: | |
f.write(config) | |
rc, out = _openssl( | |
'req', | |
'-x509', | |
'-config', config_path, | |
'-subj', subj, | |
'-set_serial', int(time.time() * 10000), | |
'-days', ttl, | |
'-passout', f'pass:{password}', | |
'-newkey', f'rsa:{size}', | |
'-keyout', key, | |
'-out', cert, | |
'-extensions', 'req_ext' | |
) | |
if rc: | |
raise Error('could not issue self-signed certificate', out) | |
_cleanup(config_path) | |
return cert, key | |
def _issue_signed(cert, key, password, subj, ttl, size, sans, is_ca, pathlen, ca_cert, ca_key, ca_password): | |
config = textwrap.dedent( | |
f'''\ | |
authorityKeyIdentifier = keyid,issuer:always | |
basicConstraints = critical, CA:TRUE, pathlen:{pathlen} | |
keyUsage = critical, digitalSignature, keyEncipherment, cRLSign, keyCertSign | |
extendedKeyUsage = clientAuth, serverAuth | |
subjectKeyIdentifier = hash | |
''' | |
if is_ca else | |
f'''\ | |
authorityKeyIdentifier = keyid,issuer:always | |
basicConstraints = CA:FALSE | |
keyUsage = critical, digitalSignature, keyEncipherment | |
extendedKeyUsage = clientAuth, serverAuth | |
subjectKeyIdentifier = hash | |
''' | |
) | |
LOG.info('issuing %r: cert=%s key=%s ca=%s', subj, cert, key, ca_cert) | |
if sans: | |
config += 'subjectAltName = @alt_names\n' | |
config += '[alt_names]\n' | |
added_sans = set() | |
for i, san in enumerate(sorted(sans), start=1): | |
if san in added_sans: | |
LOG.warning('skip duplicate SAN: index=%d value=%s', i, san) | |
continue | |
added_sans.add(san) | |
if PATTERN_IP.match(san): | |
config += 'IP.%d = %s\n' % (i, san) | |
else: | |
config += 'DNS.%d = %s\n' % (i, san) | |
config_path = f'{cert}.config' | |
LOG.debug('write config: path=%s\n\033[34m%s\033[0m', config_path, config) | |
with open(config_path, 'w') as f: | |
f.write(config) | |
csr = f'{cert}.csr' | |
LOG.debug('create csr: csr=%s key=%s', csr, key) | |
rc, out = _openssl( | |
'req', | |
'-new', | |
'-subj', subj, | |
'-passout', f'pass:{password}', | |
'-newkey', f'rsa:{size}', | |
'-keyout', key, | |
'-out', csr, | |
) | |
if rc: | |
raise Error('could not create CSR', out) | |
LOG.debug('sign csr: cert=%s key=%s ca_cert=%s ca_key=%s', cert, key, ca_cert, ca_key) | |
rc, out = _openssl( | |
'x509', | |
'-req', | |
'-sha256', | |
'-CA', ca_cert, | |
'-CAkey', ca_key, | |
'-set_serial', int(time.time() * 10000), | |
'-passin', f'pass:{ca_password}', | |
'-days', ttl, | |
'-in', csr, | |
'-out', cert, | |
'-extfile', config_path, | |
) | |
if rc: | |
raise Error('could not sign CSR', out) | |
_cleanup(config_path, csr) | |
return cert, key | |
def _now(): | |
return datetime.datetime.now(tz=dateutil.tz.UTC) | |
def _openssl(*params) -> (int, str): | |
command = ['openssl'] + [str(v) for v in params] | |
LOG.debug('\033[33m+ %s\033[0m', ' '.join(repr(s) if ' ' in s else s for s in command)) | |
rc = 0 | |
try: | |
out = subprocess.check_output(command, stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError as e: | |
out = e.stdout | |
rc = e.returncode | |
return rc, out.strip().decode() | |
def _parse_subj(value: str) -> str: | |
value = value.strip() | |
if PATTERN_SUBJ.match(value): | |
return value | |
return f'/CN={value}' | |
def _parse_ttl(value: str) -> int: | |
value = value.strip() | |
if not value: | |
raise Error('missing value') | |
if PATTERN_DAYS.match(value): | |
return int(value[:-1]) | |
try: | |
t = dateutil.parser.parse(value) | |
except Exception as e: | |
LOG.debug('stack trace', exc_info=e) | |
raise Error(f'could not parse ttl: {e}; value={value!r}', e) | |
now = _now().replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=dateutil.tz.UTC) | |
exp = t.replace(hour=0, minute=0, second=0, tzinfo=dateutil.tz.UTC) | |
return (exp - now).days | |
def _read_ca_password(ca_key): | |
value = None | |
while not value: | |
value = _read_password('Enter CA password: ', 'CA_PASSWORD') | |
# Check key password | |
rc, out = _openssl( | |
'rsa', | |
'-check', | |
'-noout', | |
'-in', ca_key, | |
'-passin', f'pass:{value}', | |
) | |
if rc: | |
if 'bad decrypt' in out: | |
LOG.warning('Incorrect password for CA') | |
value = None | |
else: | |
raise Error(f'could not read CA key (path={ca_key})', out) | |
return value | |
def _read_password(msg, env): | |
if not env: | |
raise Error('env is required') | |
if not msg: | |
raise Error('msg is required') | |
value = os.getenv(env.strip()) | |
while not value: | |
value = getpass.getpass(msg).strip() | |
return value | |
class Error(Exception): | |
def __init__(self, message, details=None): | |
if not isinstance(message, str): | |
raise TypeError('message must be str') | |
message = message.strip() | |
super().__init__(message) | |
self.message = message | |
self.details = details | |
if __name__ == '__main__': | |
try: | |
exit(main()) | |
except KeyboardInterrupt: | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment