Skip to content

Instantly share code, notes, and snippets.

@dbazile
Created December 17, 2019 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dbazile/78d97ad0995a3df4af742f2b1ec1fe2f to your computer and use it in GitHub Desktop.
Save dbazile/78d97ad0995a3df4af742f2b1ec1fe2f to your computer and use it in GitHub Desktop.
scripting openssl to generate certificate chains easily
#!/usr/bin/env python3
import argparse
import datetime
import getpass
import logging
import os
import re
import subprocess
import sys
import textwrap
import time
import dateutil.parser
import dateutil.tz
PATTERN_IP = re.compile(r'^\d+\.\d+\.\d+\.\d+$')
PATTERN_DAYS = re.compile(r'^\d+\s*(d|days)$', re.I)
PATTERN_SUBJ = re.compile(r'^(/[^=]+=[^=]+)+$')
LOG: logging.Logger = logging.getLogger('certgen')
def main():
ap = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
ap.add_argument('--debug', action='store_true')
ap.add_argument('--ca-cert', help='Path to CA certificate file')
ap.add_argument('--ca-key', help='Path to CA key file')
ap.add_argument('--cert', default='new.crt', help='Path to issued certificate')
ap.add_argument('--key', default='new.key', help='Path to issued certificate key')
ap.add_argument('--expires', default='30d', help='Timestamp when this certficiate expires. If ends in "d", will be interpreted as number of days from now.')
ap.add_argument('--size', default=4096, type=int, help='Number of bits to use for key')
ap.add_argument('--is-ca', action='store_true', help='If set, issued certificate will be marked as a CA')
ap.add_argument('--pathlen', default=0, type=int, help='Number of intermediate CAs that can exist between this CA and issued certificates')
ap.add_argument('--force', action='store_true', help='Overwrites existing files without prompting')
ap.add_argument('subject', help='Subject name for issued certificate')
ap.add_argument('san', nargs='*', help='Subject alternative name(s) for issued certificate')
params = ap.parse_args()
logging.basicConfig(
datefmt='%H:%M:%S',
format='%(asctime)s [%(levelname)-5s] %(name)s: %(funcName)s: %(message)s',
level='DEBUG' if params.debug else 'INFO',
stream=sys.stdout,
)
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug('params:\n\033[34m%s\033[0m',
'\n'.join([f' {s:>16} : {getattr(params, s)!r}' for s in sorted(dir(params)) if not s.startswith('_')]))
try:
cert, key = issue(
subj=params.subject,
cert=params.cert,
key=params.key,
ttl=_parse_ttl(params.expires),
size=params.size,
sans=params.san,
is_ca=params.is_ca,
pathlen=params.pathlen,
ca_cert=params.ca_cert,
ca_key=params.ca_key,
force=params.force,
)
LOG.info('\033[1m%s\033[0m', cert)
LOG.info('\033[1m%s\033[0m', key)
except Error as e:
if e.details:
LOG.error('%s\n\033[31m%s\033[0m', e, e.details)
else:
LOG.error('%s', e)
LOG.debug('stack trace', exc_info=e)
return 1
return 0
def issue(cert, key, subj, ttl, size, sans=None, is_ca=False, pathlen=0, ca_cert=None, ca_key=None, force=False):
if not cert:
raise Error('cert is required')
if not key:
raise Error('key is required')
if os.path.isfile(cert) and os.path.isfile(key) and not force:
raise Error(f'cert {cert!r} and key {key!r} already exist')
subj = _parse_subj(subj)
password = _read_password('Enter password for new certificate: ', 'CERT_PASSWORD')
if not ca_cert and not ca_key:
return _issue_ca(cert, key, password, subj, size, ttl, pathlen)
else:
ca_password = _read_ca_password(ca_key)
return _issue_signed(cert, key, password, subj, ttl, size, sans, is_ca, pathlen, ca_cert, ca_key, ca_password)
def _cleanup(*paths):
for path in paths:
LOG.debug('delete: %s', path)
try:
os.unlink(path)
except OSError as e:
LOG.error('delete failed: %s', e)
LOG.debug('stack trace', exc_info=e)
def _issue_ca(cert, key, password, subj, size, ttl, pathlen):
config = textwrap.dedent(
f'''
[ req_ext ]
authorityKeyIdentifier = keyid,issuer:always
basicConstraints = critical, CA:TRUE, pathlen:{pathlen}
keyUsage = critical, digitalSignature, cRLSign, keyCertSign
subjectKeyIdentifier = hash
[ req ]
distinguished_name = dn
[ dn ]
commonName = CN
'''
)
config_path = cert + '.config'
LOG.debug('write config: path=%s\n\033[34m%s\033[0m', config_path, config)
with open(config_path, 'w') as f:
f.write(config)
rc, out = _openssl(
'req',
'-x509',
'-config', config_path,
'-subj', subj,
'-set_serial', int(time.time() * 10000),
'-days', ttl,
'-passout', f'pass:{password}',
'-newkey', f'rsa:{size}',
'-keyout', key,
'-out', cert,
'-extensions', 'req_ext'
)
if rc:
raise Error('could not issue self-signed certificate', out)
_cleanup(config_path)
return cert, key
def _issue_signed(cert, key, password, subj, ttl, size, sans, is_ca, pathlen, ca_cert, ca_key, ca_password):
config = textwrap.dedent(
f'''\
authorityKeyIdentifier = keyid,issuer:always
basicConstraints = critical, CA:TRUE, pathlen:{pathlen}
keyUsage = critical, digitalSignature, keyEncipherment, cRLSign, keyCertSign
extendedKeyUsage = clientAuth, serverAuth
subjectKeyIdentifier = hash
'''
if is_ca else
f'''\
authorityKeyIdentifier = keyid,issuer:always
basicConstraints = CA:FALSE
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectKeyIdentifier = hash
'''
)
LOG.info('issuing %r: cert=%s key=%s ca=%s', subj, cert, key, ca_cert)
if sans:
config += 'subjectAltName = @alt_names\n'
config += '[alt_names]\n'
added_sans = set()
for i, san in enumerate(sorted(sans), start=1):
if san in added_sans:
LOG.warning('skip duplicate SAN: index=%d value=%s', i, san)
continue
added_sans.add(san)
if PATTERN_IP.match(san):
config += 'IP.%d = %s\n' % (i, san)
else:
config += 'DNS.%d = %s\n' % (i, san)
config_path = f'{cert}.config'
LOG.debug('write config: path=%s\n\033[34m%s\033[0m', config_path, config)
with open(config_path, 'w') as f:
f.write(config)
csr = f'{cert}.csr'
LOG.debug('create csr: csr=%s key=%s', csr, key)
rc, out = _openssl(
'req',
'-new',
'-subj', subj,
'-passout', f'pass:{password}',
'-newkey', f'rsa:{size}',
'-keyout', key,
'-out', csr,
)
if rc:
raise Error('could not create CSR', out)
LOG.debug('sign csr: cert=%s key=%s ca_cert=%s ca_key=%s', cert, key, ca_cert, ca_key)
rc, out = _openssl(
'x509',
'-req',
'-sha256',
'-CA', ca_cert,
'-CAkey', ca_key,
'-set_serial', int(time.time() * 10000),
'-passin', f'pass:{ca_password}',
'-days', ttl,
'-in', csr,
'-out', cert,
'-extfile', config_path,
)
if rc:
raise Error('could not sign CSR', out)
_cleanup(config_path, csr)
return cert, key
def _now():
return datetime.datetime.now(tz=dateutil.tz.UTC)
def _openssl(*params) -> (int, str):
command = ['openssl'] + [str(v) for v in params]
LOG.debug('\033[33m+ %s\033[0m', ' '.join(repr(s) if ' ' in s else s for s in command))
rc = 0
try:
out = subprocess.check_output(command, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
out = e.stdout
rc = e.returncode
return rc, out.strip().decode()
def _parse_subj(value: str) -> str:
value = value.strip()
if PATTERN_SUBJ.match(value):
return value
return f'/CN={value}'
def _parse_ttl(value: str) -> int:
value = value.strip()
if not value:
raise Error('missing value')
if PATTERN_DAYS.match(value):
return int(value[:-1])
try:
t = dateutil.parser.parse(value)
except Exception as e:
LOG.debug('stack trace', exc_info=e)
raise Error(f'could not parse ttl: {e}; value={value!r}', e)
now = _now().replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=dateutil.tz.UTC)
exp = t.replace(hour=0, minute=0, second=0, tzinfo=dateutil.tz.UTC)
return (exp - now).days
def _read_ca_password(ca_key):
value = None
while not value:
value = _read_password('Enter CA password: ', 'CA_PASSWORD')
# Check key password
rc, out = _openssl(
'rsa',
'-check',
'-noout',
'-in', ca_key,
'-passin', f'pass:{value}',
)
if rc:
if 'bad decrypt' in out:
LOG.warning('Incorrect password for CA')
value = None
else:
raise Error(f'could not read CA key (path={ca_key})', out)
return value
def _read_password(msg, env):
if not env:
raise Error('env is required')
if not msg:
raise Error('msg is required')
value = os.getenv(env.strip())
while not value:
value = getpass.getpass(msg).strip()
return value
class Error(Exception):
def __init__(self, message, details=None):
if not isinstance(message, str):
raise TypeError('message must be str')
message = message.strip()
super().__init__(message)
self.message = message
self.details = details
if __name__ == '__main__':
try:
exit(main())
except KeyboardInterrupt:
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment