Skip to content

Instantly share code, notes, and snippets.

@thomdixon
Last active October 14, 2021 10:25
Show Gist options
  • Save thomdixon/bc3d664b6305adec9ecbc155b5ca3b6d to your computer and use it in GitHub Desktop.
Save thomdixon/bc3d664b6305adec9ecbc155b5ca3b6d to your computer and use it in GitHub Desktop.
Simple implementation of SSH certificates for Python using Paramiko. Supports serializing from and to Message objects.
from collections import OrderedDict
import paramiko
from paramiko.agent import AgentKey
from paramiko.message import Message
CERT_ATTRIBUTE_TO_TYPE = {
'name': 'string',
'nonce': 'string',
# ecdsa
'curve': 'string',
'public_key': 'string',
# rsa
'e': 'mpint',
'n': 'mpint',
# ed25519
'pk': 'string',
# dss
'p': 'mpint',
'q': 'mpint',
'g': 'mpint',
'y': 'mpint',
'serial': 'int64',
'type': 'int',
'key_id': 'string',
'valid_principals': 'string',
'valid_after': 'int64',
'valid_before': 'int64',
'critical_options': 'string',
'extensions': 'string',
'reserved': 'string',
'signature_key': 'string',
'signature': 'string'
}
def _get_cert_params(name):
"""Map a certificate name to the list of parameters within it."""
head = ('name', 'nonce')
tail = (
'serial', 'type', 'key_id', 'valid_principals', 'valid_after', 'valid_before',
'critical_options', 'extensions', 'reserved', 'signature_key', 'signature'
)
if name == 'ssh-rsa-cert-v01@openssh.com':
key_specific = ('e', 'n')
elif name == 'ssh-ed25519-cert-v01@openssh.com':
key_specific = ('pk',)
elif name == 'ssh-dss-cert-v01@openssh.com':
key_specific = ('p', 'q', 'g', 'y')
elif name in (
'ecdsa-sha2-nistp256-cert-v01@openssh.com',
'ecdsa-sha2-nistp384-cert-v01@openssh.com',
'ecdsa-sha2-nistp521-cert-v01@openssh.com'
):
key_specific = ('curve', 'public_key')
else:
raise ValueError('unsupported certificate type: %s' % name)
return head + key_specific + tail
class SSHCertificate(object):
"""An SSH certificate."""
def _has_params(self):
return hasattr(self, '_params') and self._params
def to_message(self):
result = Message()
if not self._has_params():
return result
for param in self._params:
value = getattr(self, param)
getattr(result, 'add_%s' % CERT_ATTRIBUTE_TO_TYPE[param])(value)
return result
def to_dict(self):
result = OrderedDict() # can use regular dict if you don't care about order
if not self._has_params():
return result
for param in self._params:
result[param] = getattr(self, param)
return result
def to_agent_key(self, agent=None):
as_bytes = self.to_message().asbytes()
return AgentKey(agent, as_bytes)
@classmethod
def from_dict(cls, d):
"""Create an SSH certificate from a dict."""
if 'cert' not in d['name']:
raise ValueError('does not contain a certificate')
keys = d.keys()
params = _get_cert_params(d['name'])
has = set(keys)
required = set(params)
if has != required:
raise ValueError('missing required parameters: %s' % (required - has))
# create an SSHCertificate instance to populate
result = cls()
# remember the order in which we will dump params later
result._params = params
# hydrate the object
for key, value in d.iteritems():
setattr(result, key, value)
return result
@classmethod
def from_message(cls, m):
m.rewind()
name = m.get_string()
m.rewind()
# duplicated, but fail fast
if 'cert' not in name:
raise ValueError('does not contain a certificate')
cert = {
param: getattr(m, 'get_%s' % CERT_ATTRIBUTE_TO_TYPE[param])()
for param in _get_cert_params(name)
}
return cls.from_dict(cert)
@classmethod
def from_agent_key(cls, k):
m = Message(k.blob)
return cls.from_message(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment