Last active
May 19, 2017 17:23
-
-
Save adiroiban/d4af3aa243924609cd54e504b9fa2895 to your computer and use it in GitHub Desktop.
pyopenssl SSLContext.set_verify callback for CRL / CDP validation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CRLValidator(object): | |
""" | |
Helper for validating certificates against CRL. | |
For now it only support a single CRL signed by a single CA. | |
It provides the `CRLValidator.validate(certificate)` method which can be | |
used as a callback for `SSLContext.set_verify`. | |
""" | |
_random = random | |
def __init__(self, scheduler, update_interval, event_emitter): | |
if not scheduler: | |
scheduler = reactor | |
self._scheduler = scheduler | |
self._emitEvent = event_emitter | |
# Time in seconds after which the CRL is loaded. | |
# 0 for auto-reload. | |
self._update_interval = update_interval | |
# HTTP client used to get CRL over HTTP. | |
self._http_agent = NonPersistentAgent() | |
self._resetBeforeStartState() | |
def _resetBeforeStartState(self): | |
""" | |
Set the validator state as before starting. | |
""" | |
# Deferred attached to the next update for the CRL location. | |
self._next_update_deferreds = {} | |
# Mapping from CA subject to CA cert used to validate CRL signature. | |
self._ca_certs = {} | |
self._running = False | |
# Mapping between CA subject and list of revoked ids. | |
self._revoked_serials = {} | |
# Mapping between CRL location to issuer. | |
self._crl_location_to_issuer = {} | |
# True when validator should auto-load CRL from CDP. | |
self._use_cdp = False | |
def start(self): | |
""" | |
Called after CRL validator was configured and can be used. | |
""" | |
if not self._ca_certs: | |
raise ServerException(u'CA was not set to validate the CRL.') | |
if not self._revoked_serials: | |
raise ServerException(u'No CRL was configured.') | |
self._running = True | |
def stop(self): | |
""" | |
Called when CRL validator should no longer be used. | |
""" | |
for location, deferred in self._next_update_deferreds.items(): | |
if not deferred.called: | |
deferred.cancel() | |
self._resetBeforeStartState() | |
@property | |
def running(self): | |
""" | |
`True` when the validator should be used. | |
""" | |
return self._running | |
@defer.inlineCallbacks | |
def configure(self, ca, crls, certificate): | |
""" | |
Configure based on CA PEM file stored at `ca` path and load CRLs | |
from `crl` list. | |
Return a deferred which is fired when the CRL is configured. | |
""" | |
self._addCAFromPath(path=ca) | |
if TYPE_NAME.CRL_DISTRIBUTION_POINTS in crls: | |
deferred = self._configureCDP(start_certificate=certificate) | |
else: | |
deferred = self._addCRLs(crls=crls) | |
# Wait for initial CRL cache to be loaded. | |
yield deferred | |
def _addCAFromPath(self, path): | |
""" | |
Add CA certificates stored as PEM file on local filesystem to | |
validate the CRL. | |
""" | |
if self._ca_certs: | |
# For now we don't support multiple calls to this method since | |
# we don't need it. | |
# This can be change if we somehow need it. | |
raise ServerException( | |
u'Only a single CA file can be loaded for CRL validation.') | |
ca_segments = local_filesystem.getSegmentsFromRealPath(path) | |
if local_filesystem.isFolder(ca_segments): | |
raise ServerException( | |
u'CRL can only be used when CA certificates are stored ' | |
u'in a single file.' | |
) | |
ca_file = None | |
try: | |
ca_file = local_filesystem.openFileForReading(ca_segments) | |
pem_objects = pem.parse(ca_file.read()) | |
finally: | |
if ca_file: | |
ca_file.close() | |
ca_certs = {} | |
for pem_object in pem_objects: | |
if not isinstance(pem_object, pem.Certificate): | |
continue | |
cert = crypto.load_certificate( | |
crypto.FILETYPE_PEM, pem_object.as_bytes()) | |
subject = x509.get_subject( | |
cert.get_subject().get_components(), | |
u'CA loaded from %s' % (path,), | |
) | |
ca_certs[subject] = cert | |
if not ca_certs: | |
raise ServerException( | |
u'CA file used to validated the CRL has no certificates.') | |
self._ca_certs.update(ca_certs) | |
def _addCRLs(self, crls): | |
""" | |
Add a list of CRLs to validate against. | |
Returns a deferred which is fired when the CRLs were loaded, | |
successfully or not. | |
On failure, only the error from the first CRL is raised. | |
If you want to catch errors from all CRLs call this separately with | |
a CLR at a time. | |
""" | |
def cb_update_crl(result): | |
get_method, location = result | |
issuer = self._crl_location_to_issuer.get(location, None) | |
if issuer: | |
# Location is already loaded. Nothing new to do. | |
return | |
deferred = self._updateCRL(get_method, location) | |
return deferred | |
deferreds = [] | |
for crl in crls: | |
deferred = self._getCRLMethod(crl_configuration=crl) | |
# Do an initial load to forward exceptions to caller and | |
# then schedule the periodic updates. | |
deferred.addCallback(cb_update_crl) | |
deferreds.append(deferred) | |
def cb_list_result(results): | |
""" | |
This is fired from the DeferredList and is used to convert it | |
into a simple Deferred failure. | |
""" | |
for index, result in enumerate(results): | |
have_succeed, result_or_failure = result | |
if have_succeed: | |
continue | |
location = crls[index] | |
raise self._getCRLLoadErrorWithInvalidation( | |
result_or_failure.value, location) | |
deferred = defer.DeferredList(deferreds, consumeErrors=True) | |
deferred.addCallback(cb_list_result) | |
return deferred | |
def _configureCDP(self, start_certificate): | |
""" | |
Configure the validator to use CRL advertised by CDP. | |
An initial CRL is extracted from the CDP of the `start_certificate` | |
to cache the most probable CDP in use. | |
Returns a deferred which is fired when the initial CRL was loaded or | |
is errbacked on errors. | |
""" | |
self._use_cdp = True | |
crls = x509.get_CRLs_from_CDP_extension(certificate=start_certificate) | |
if not crls: | |
error = self._getCRLLoadErrorWithInvalidation( | |
error=ServerException( | |
u'No CDP extension defined for the local certificate.'), | |
location='crl-distribution-points', | |
) | |
return defer.fail(error) | |
# FIXME:3494: | |
# We should support multiple CRL locations and look at loading | |
# all the advertised CRL. | |
return self._addCRLs(crls=crls[:1]) | |
def validate(self, peer_certificate): | |
""" | |
Check if certificate is valid and not part of a CRL. | |
Raise ServerException with details if certificate is not valid. | |
""" | |
if not self._running: | |
raise ServerException(u'CRL validation was not yet started.') | |
issuer = x509.get_subject( | |
peer_certificate.get_issuer().get_components(), | |
'peer certificate' | |
) | |
revoked_is = self._revoked_serials.get(issuer, _CRL_NOT_YET_LOADED) | |
if self._use_cdp: | |
# When CDP is used, clients should always use a certificate which | |
# is advertising a CDP. | |
crls = x509.get_CRLs_from_CDP_extension(peer_certificate) | |
if not crls: | |
raise ServerException( | |
u'CDP is in usage and peer certificate ' | |
u'has no CDP extension.' | |
) | |
if revoked_is is _CRL_NOT_YET_LOADED: | |
if not self._use_cdp: | |
# CDP are not used and CRL for this issuer is not yet | |
# loaded. | |
raise ServerException( | |
u'CRL not yet loaded for %s.' % (issuer,)) | |
# We are using CDP, and we don't have a CRL yet. | |
# So load the CRL if is not already loaded. | |
load_message = self._loadCDP( | |
crls=crls, peer_certificate=peer_certificate) | |
raise ServerException(load_message) | |
# We have a revocation list for this issuer. | |
if peer_certificate.get_serial_number() in revoked_is: | |
raise ServerException(u'Revoked certificate.') | |
def _loadCDP(self, crls, peer_certificate): | |
""" | |
Load a CRL at runtime, as advertised in the CDP extension. | |
Return a human readable message describing the action which was | |
taken. | |
""" | |
# FIXME:3494: | |
# We should support multiple CRL locations and look at loading | |
# all the advertised CRL. | |
crls = crls[:1] | |
if ( | |
crls[0] in self._next_update_deferreds and | |
not self._next_update_deferreds[crls[0]].called | |
): | |
# CRL is in the processes of being updated so until it is | |
# loaded, we fail. | |
return u'CDP/CRL for the peer certificate is not loaded yet.' | |
# Make sure we have a subject for the peer certificate. | |
peer_subject = x509.get_subject( | |
peer_certificate.get_subject().get_components(), | |
'peer_certificate during CDP/CRL', | |
) | |
# No CRL yet for this peer certificate, so we trigger their loading. | |
deferred = self._addCRLs(crls=crls) | |
deferred.addErrback(lambda failure: emit_error_details( | |
error=failure.value, | |
event_id=u'20178', | |
data={'peer_subject': peer_subject}, | |
emitter=self._emitEvent, | |
)) | |
self._cdp_auto_load_deferred = deferred | |
# We fail this validation as we don't have yet a CRL. | |
return u'CDP/CRL for the peer certificate is being loaded.' | |
def _now(self): | |
""" | |
Return the current datetime in UTC. | |
""" | |
result = datetime.utcfromtimestamp(self._scheduler.seconds()) | |
return result.replace(tzinfo=UTC) | |
def _getCRLLoadErrorWithInvalidation(self, error, location): | |
""" | |
Return a ServerError associated with CRL load operation and invalidate | |
the currently loaded CRL. | |
""" | |
self._invalidateCRL(location) | |
data = { | |
'uri': location, | |
'details': unicode(error), | |
} | |
return ServerError(u'20036', data=data) | |
def _invalidateCRL(self, location): | |
""" | |
Invalidate the CRL which is already loaded from location. | |
""" | |
try: | |
issuer = self._crl_location_to_issuer[location] | |
self._revoked_serials[issuer] = _CRL_NOT_YET_LOADED | |
except KeyError: | |
# It might be that we were not able to load the location so we | |
# don't know its issuer. | |
pass | |
def _getCRLMethod(self, crl_configuration): | |
""" | |
Return a deferred which is fired with tuple of | |
(get_method, location) for the single `crl_configuration`. | |
""" | |
get_method = None | |
location = crl_configuration | |
if not location: | |
return defer.fail(ServerException( | |
u'CRL location can not be empty.')) | |
url = urlparse(crl_configuration) | |
if crl_configuration[1:3] == ':\\': | |
# Windows absolute path without a scheme. | |
get_method = self._loadCRLFromPath | |
elif url.scheme == '': | |
# No scheme is fallback to local path. | |
get_method = self._loadCRLFromPath | |
elif url.scheme == 'file': | |
get_method = self._loadCRLFromPath | |
if url.netloc: | |
# we have a relative path or a Windows path and the final | |
# location is url.netloc + url.path | |
if not url.path: | |
# We have a windows path or a relative path comprised of | |
# a single segment. | |
# FIXME:3480: | |
# Might need to store it as absolute path. | |
location = url.netloc | |
else: | |
# We have Unix relative path. | |
# FIXME:3480: | |
# Might need to store it as absolute path. | |
location = url.netloc + url.path | |
else: | |
# We have absolute unix path. | |
location = url.path | |
elif url.scheme == 'http': | |
get_method = self._loadCRLFromURL | |
else: | |
return defer.fail(ServerException( | |
u'Unknown method for loading CRL: %s' % (url.scheme,))) | |
return defer.succeed((get_method, location)) | |
@defer.inlineCallbacks | |
def _updateCRL(self, get_method, location): | |
""" | |
Load/reload revoked certificates from the configured CRL. | |
""" | |
deferred = defer.maybeDeferred(get_method, location) | |
# Let other know that we are loading a CRL for this location. | |
# It will be later overwritten by the schedule call, but in | |
# case someone else want to know if there is already a deferred for | |
# this location, it will check via this dict. | |
self._next_update_deferreds[location] = deferred | |
crl_data = yield deferred | |
if not crl_data: | |
raise ServerException(u'Empty CRL at the configured location.') | |
crl_details = x509.crl_load(crl_data) | |
issuer = crl_details['issuer'] | |
try: | |
ca_cert = self._ca_certs[issuer] | |
except KeyError: | |
raise ServerException( | |
u'CRL signed by a CA which is not trusted: %s' % (issuer,)) | |
try: | |
crypto.verify( | |
ca_cert, | |
crl_details['signature'], | |
crl_details['to-be-signed'], | |
crl_details['digest-algorithm'], | |
) | |
except crypto.Error as error: | |
raise ServerException(u'CRL with invalid signature: %s' % (error,)) | |
new_revoked_certificates = [] | |
revoked_certificates = crl_details['revoked'] | |
for revoked in revoked_certificates: | |
new_revoked_certificates.append(revoked['serial']) | |
existing_issuer = self._crl_location_to_issuer.get(location, issuer) | |
if existing_issuer != issuer: | |
self._revoked_serials[existing_issuer] = _CRL_NOT_YET_LOADED | |
raise ServerException( | |
u'Reseting current CRL. ' | |
u'Issuer of the CRL at %s was changed from "%s" to "%s".' % ( | |
location, existing_issuer, issuer)) | |
# Check to see that the current issuer is not already loaded from | |
# another location. | |
for other_location, other_issuer in ( | |
self._crl_location_to_issuer.items() | |
): | |
if other_issuer == issuer and other_location != location: | |
# FIXME:3494: | |
# This should be flexible as we should allow CRL fallback and | |
# in that case we would have multiple CRL for a single issuer. | |
raise ServerException( | |
u'CRL for %s is already loaded from %s. ' | |
u'It was tried to also be loaded from %s.' % ( | |
issuer, other_location, location)) | |
self._revoked_serials[issuer] = new_revoked_certificates | |
self._crl_location_to_issuer[location] = issuer | |
next_update = crl_details['next-update'] | |
delay = self._getCRLUpdateDelay(crl_details) | |
self._scheduleCRLNextUpdate(delay, get_method, location, next_update) | |
update_datetime = self._now() + timedelta(seconds=delay) | |
self._emitEvent(u'20037', data={ | |
'uri': location, | |
'issuer': issuer, | |
'count': len(new_revoked_certificates), | |
'update_datetime': update_datetime, | |
'update_seconds': delay, | |
'next_publish': crl_details['next-publish'], | |
'next_update': next_update, | |
}) | |
def _loadCRLFromPath(self, path): | |
""" | |
Load the raw CRL from a local filesystem. | |
""" | |
crl_segments = local_filesystem.getSegmentsFromRealPath(path) | |
if local_filesystem.isFolder(crl_segments): | |
raise ServerException( | |
u'Loading CRL from directories is not supported.') | |
crl_file = local_filesystem.openFileForReading(crl_segments) | |
try: | |
crl_data = crl_file.read() | |
finally: | |
crl_file.close() | |
return crl_data | |
def _loadCRLFromURL(self, url): | |
""" | |
Load the raw CRL over HTTP. | |
""" | |
# FIXME:3465: | |
# HTTP request should be done via a redirection agent. | |
deferred = self._http_agent.request('GET', url.encode('utf-8')) | |
def cb_got_response(response): | |
if response.code != 200: | |
raise ServerException( | |
u'Failed to retrieve the CRL at %s over HTTP: %s %s' % ( | |
url, response.code, response.phrase)) | |
return self._http_agent.readBody(response) | |
deferred.addCallback(cb_got_response) | |
return deferred | |
def _getCRLUpdateDelay(self, crl_details): | |
""" | |
Return the seconds after which the CRL should be updated. | |
""" | |
next_update = crl_details['next-update'] | |
next_publish = crl_details['next-publish'] | |
if next_update: | |
now = self._now() | |
crl_delta = next_update - now | |
next_load = crl_delta.total_seconds() | |
if next_load <= 0: | |
raise ServerException( | |
u'CRL had expired on UTC %s. Now is %s' % ( | |
next_update.isoformat(), now.isoformat())) | |
if next_publish: | |
if next_publish > next_update: | |
raise ServerException( | |
u'CRL next publish at %s UTC is after ' | |
u'next update at %s UTC.' % ( | |
next_publish.isoformat(), next_update.isoformat())) | |
# We have the CRL Next Publish extension, so use that value. | |
crl_delta = next_publish - now | |
next_publish_load = crl_delta.total_seconds() | |
if next_publish_load <= 0: | |
# We are pass the next publish time. | |
# Retry using the soft retry or next update, | |
# which comes first. | |
next_load = min(next_load, _CRL_SOFT_RELOAD_DELAY) | |
else: | |
next_load = next_publish_load | |
if self._update_interval: | |
# We have a pre-configured refresh rate. | |
next_load = self._update_interval | |
else: | |
# If the current CRL had expired, there is no next update to | |
# schedule. | |
if not next_update: | |
raise ServerException(u'CRL has no nextUpdate field.') | |
# Use the refresh value advertised in the certificate to | |
# compute the next refresh time. | |
# Refresh it with a random delay of up to 60 seconds so what we | |
# don't hit it at the exact refresh time. | |
next_load = next_load + self._random.randint(0, 60) | |
return next_load | |
def _scheduleCRLNextUpdate(self, delay, get_method, location, next_update): | |
""" | |
Called to schedule the next update of the CRL. | |
""" | |
def eb_update(failure, location): | |
if failure.check(defer.CancelledError): | |
# We are stopping so this is a normal way to signal. | |
return | |
if next_update <= self._now(): | |
# The CRL is no longer valid. | |
self._invalidateCRL(location) | |
next_load = _CRL_SOFT_RELOAD_DELAY | |
else: | |
# Retry after the soft reload or at the final next update, | |
# which is sooner, so that we don't run with a loaded CRL | |
# past the Next Update date. | |
crl_delta = next_update - self._now() | |
# Delay an extra 1 second so that on fast system with low | |
# clock resolution we don't fail to often. | |
next_load = crl_delta.total_seconds() + 1 | |
next_load = min(next_load, _CRL_SOFT_RELOAD_DELAY) | |
# Try reloading the CRL. | |
self._scheduleCRLNextUpdate( | |
delay, get_method, location, next_update) | |
self._emitEvent(u'20038', data={ | |
'uri': location, | |
'next_load': next_load, | |
'details': unicode(failure.value), | |
}) | |
deferred = task.deferLater( | |
clock=self._scheduler, | |
delay=delay, | |
callable=self._updateCRL, | |
get_method=get_method, | |
location=location, | |
) | |
deferred.addErrback(eb_update, location) | |
self._next_update_deferreds[location] = deferred |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2016 Adi Roiban. | |
# See LICENSE for details. | |
""" | |
Handling of X.509 files. | |
""" | |
from builtins import range | |
import datetime | |
from binascii import a2b_base64 | |
from Crypto.Util import asn1 | |
from OpenSSL import crypto | |
from pyasn1.codec.ber import decoder as ber_decoder | |
from pyasn1.error import PyAsn1Error | |
from pyasn1.type import univ | |
from pyasn1_modules import rfc2459 | |
from chevah.server import from_utf8 | |
from chevah.server.commons import UTC | |
from chevah.server.commons.exception import ServerException | |
OID_to_name = { | |
# DN oids. | |
(2, 5, 4, 3): 'CN', # common name | |
(2, 5, 4, 6): 'C', # country | |
(2, 5, 4, 7): 'L', # locality | |
(2, 5, 4, 8): 'ST', # stateOrProvince | |
(2, 5, 4, 10): 'O', # organization | |
(2, 5, 4, 11): 'OU', # organizationalUnit | |
(0, 9, 2342, 19200300, 100, 1, 25): 'DC', # domainComponent | |
(1, 2, 840, 113549, 1, 9, 1): 'emailAddress', | |
# Digest algorithms. | |
# See: http://www.alvestrand.no/objectid/1.2.840.113549.1.1.html | |
(1, 2, 840, 113549, 1, 1, 4): b'md5', | |
(1, 2, 840, 113549, 1, 1, 5): b'sha1', | |
(1, 2, 840, 113549, 1, 1, 11): b'sha256', | |
(1, 2, 840, 113549, 1, 1, 13): b'sha512', | |
} | |
name_to_OID = {value: key for key, value in OID_to_name.items()} | |
def _from_bits(bits): | |
""" | |
Return a string of bytes from a list of 0/1 bits. | |
""" | |
chars = [] | |
for b in range(len(bits) / 8): | |
byte = bits[b * 8:(b + 1) * 8] | |
chars.append(chr(int(''.join([str(bit) for bit in byte]), 2))) | |
return ''.join(chars) | |
def _decode_time(obj, time_name): | |
""" | |
Decode x.509 time objects. | |
See RFC 5280. | |
It supports UTCTime only in GMT. | |
""" | |
if obj is None: | |
return None | |
time_obj = obj.getComponentByName('utcTime') | |
if time_obj is None: | |
# We don't have utcTime, so we try with generalized time. | |
time_obj = obj.getComponentByName('generalTime') | |
if time_obj is None: | |
# We should not hit this place as a certificate should be encoded | |
# with either URC or Generalized time. | |
raise ServerException(u'No time found in CRL for %s.' % (time_name,)) | |
time_formats = { | |
# 4.1.2.5.1. UTCTime. | |
23: "%y%m%d%H%M%SZ", | |
# 4.1.2.5.2. GeneralizedTime | |
# ASN1 defines multiple formats for formats, RFC 5280 mandates a single | |
# format. | |
24: "%Y%m%d%H%M%SZ", | |
} | |
time_format = time_formats[time_obj.tagSet.uniq[1]] | |
return datetime.datetime.strptime( | |
time_obj.asOctets(), time_format).replace(tzinfo=UTC) | |
def _decode_rdn_components(rdn_sequence): | |
""" | |
Return a list with name and value of RelativeDistinguishedName. | |
""" | |
parts = [] | |
index = 0 | |
while True: | |
try: | |
component = rdn_sequence.getComponentByPosition(index) | |
except IndexError: | |
break | |
component = component.getComponentByPosition(0) | |
comp_type = component.getComponentByName('type').asTuple() | |
value = component.getComponentByName('value').asOctets()[2:] | |
name = OID_to_name[comp_type] | |
parts.append((name, value)) | |
index += 1 | |
return parts | |
def crl_parse(crl_der): | |
""" | |
Return a dictionary with the CRL attributes of the CRL passed as DER. | |
RFC 5280 | |
CertificateList ::= SEQUENCE { | |
tbsCertList TBSCertList, | |
signatureAlgorithm AlgorithmIdentifier, | |
signatureValue BIT STRING } | |
TBSCertList ::= SEQUENCE { | |
version Version OPTIONAL, | |
-- if present, MUST be v2 | |
signature AlgorithmIdentifier, | |
issuer Name, | |
thisUpdate Time, | |
nextUpdate Time OPTIONAL, | |
revokedCertificates SEQUENCE OF SEQUENCE { | |
userCertificate CertificateSerialNumber, | |
revocationDate Time, | |
crlEntryExtensions Extensions OPTIONAL | |
-- if present, version MUST be v2 | |
} OPTIONAL, | |
crlExtensions [0] EXPLICIT Extensions OPTIONAL | |
-- if present, version MUST be v2 | |
} | |
Version ::= INTEGER { v1(0), v2(1), v3(2) } | |
CertificateSerialNumber ::= INTEGER | |
Time ::= CHOICE { | |
utcTime UTCTime, | |
generalTime GeneralizedTime } | |
""" | |
try: | |
crl_asn1 = ber_decoder.decode( | |
crl_der, asn1Spec=rfc2459.CertificateList())[0] | |
except PyAsn1Error as error: | |
raise ServerException(u'Failed to decode CRL: %s' % (error.message)) | |
to_be_signed = crl_asn1.getComponentByName('tbsCertList') | |
issuer = to_be_signed.getComponentByName('issuer').getComponentByName('') | |
this_update = to_be_signed.getComponentByName('thisUpdate') | |
next_update = to_be_signed.getComponentByName('nextUpdate') | |
signature_value = crl_asn1.getComponentByName('signature') | |
signature_algorithm = crl_asn1.getComponentByName('signatureAlgorithm') | |
algorithm = signature_algorithm.getComponentByName('algorithm').asTuple() | |
try: | |
digest_name = OID_to_name[algorithm] | |
except KeyError: | |
raise ServerException( | |
u'CRL signature algorithm not supported: %s' % (algorithm,)) | |
revoked = to_be_signed.getComponentByName('revokedCertificates') | |
revoked_certificates = _parse_revoked(revoked) | |
parsed_extensions = to_be_signed.getComponentByName('crlExtensions') | |
extensions = _parse_extensions(parsed_extensions) | |
# We get the raw data to be signed as pyasn1 has no easy way to export | |
# the decoded data... other than re-encoding it, and we might re-encode | |
# it in a different order and the signature will be invalid. | |
crl_seq = asn1.DerSequence() | |
crl_seq.decode(crl_der) | |
# Look for next publish extension and expose it as root attribute. | |
next_publish = None | |
for extension in extensions: | |
if extension['name'] != 'CRL Next Publish': | |
continue | |
next_publish = extension['content']['nextPublish'] | |
return { | |
'to-be-signed': crl_seq[0], | |
'issuer': get_subject( | |
_decode_rdn_components(issuer), | |
'loaded CRL issuer', | |
), | |
'signature': _from_bits(signature_value), | |
'digest-algorithm': digest_name, | |
'this-update': _decode_time(this_update, u'thisUpdate'), | |
'next-update': _decode_time(next_update, u'nextUpdate'), | |
'next-publish': next_publish, | |
'revoked': revoked_certificates, | |
'extensions': extensions, | |
} | |
def _parse_revoked(revoked): | |
""" | |
Return the list with certificates in the revocation list. | |
""" | |
if not revoked: | |
return [] | |
revoked_certificates = [] | |
index = 0 | |
while True: | |
try: | |
revoked_certificate = revoked.getComponentByPosition(index) | |
except IndexError: | |
break | |
serial = revoked_certificate.getComponentByName('userCertificate') | |
revoked_certificates.append({ | |
'serial': int(serial), | |
}) | |
index += 1 | |
return revoked_certificates | |
def _parse_extensions(parsed_extensions): | |
""" | |
Return the list of extensions as serialized in ASN.1 `parsed_extensions` | |
sequence. | |
""" | |
if not parsed_extensions: | |
return [] | |
def oid_to_name(oid): | |
""" | |
Convert the CRL extension OID to a human readable name. | |
Return the same `oid` if extension is not known. | |
""" | |
mapping = { | |
# Microsoft CertSrv Infrastructure | |
# https://support.microsoft.com/en-us/kb/287547 | |
'1.3.6.1.4.1.311.21.4': u'CRL Next Publish', | |
# Standard extensions. | |
# As defined in RFC 5280. | |
'2.5.29.9': u'Subject Directory Attributes', | |
'2.5.29.14': u'Subject Key Identifier', | |
'2.5.29.15': u'Key Usage', | |
'2.5.29.16': u'Private Key Usage Period', | |
'2.5.29.17': u'Subject Alternative Name', | |
'2.5.29.18': u'Issuer Alternative Name', | |
'2.5.29.19': u'Basic Constraints', | |
'2.5.29.20': u'CRL Number', | |
'2.5.29.21': u'Revocation Reason', | |
'2.5.29.24': u'Invalidity Date', | |
'2.5.29.27': u'Delta CRL Indicator', | |
'2.5.29.28': u'Issuing Distribution Point', | |
'2.5.29.29': u'Certificate Issuer', | |
'2.5.29.30': u'Name Constraints', | |
'2.5.29.31': u'CRL Distribution Points', | |
'2.5.29.32': u'Certificate Policies', | |
'2.5.29.33': u'Policy Mappings', | |
'2.5.29.35': u'Authority Key Identifier', | |
'2.5.29.36': u'Policy Constraints', | |
'2.5.29.37': u'Extended Key Usage', | |
'2.5.29.46': u'Freshest CRL (a.k.a. Delta CRL Distribution Point)', | |
'2.5.29.54': u'Inhibit anyPolicy', | |
} | |
return mapping.get(oid, oid) | |
def parse_extension(oid, value): | |
""" | |
Try to parse the CRL extension, returning an empty dict if the | |
extension could not be parsed. | |
""" | |
result = {} | |
if oid == '1.3.6.1.4.1.311.21.4': | |
# https://tools.ietf.org/html/draft-deacon-lightweight-ocsp-profile-00#page-12 | |
next_publish = ber_decoder.decode( | |
value, asn1Spec=rfc2459.Time())[0] | |
result['nextPublish'] = _decode_time(next_publish, u'nextPublish') | |
return result | |
if oid == '2.5.29.20': | |
crl_number = ber_decoder.decode(value, asn1Spec=univ.Integer())[0] | |
result['crlNumber'] = int(crl_number) | |
return result | |
return result | |
extensions = [] | |
index = 0 | |
while True: | |
try: | |
extension = parsed_extensions.getComponentByPosition(index) | |
except IndexError: | |
break | |
oid = unicode(extension.getComponentByName('extnID')) | |
# Extension value is defined as an ANY type so we convert it to | |
# octets. | |
value = ber_decoder.decode( | |
extension.getComponentByName('extnValue'), | |
univ.OctetString(), | |
)[0] | |
extensions.append({ | |
'oid': oid, | |
'name': oid_to_name(oid), | |
'critical': bool(extension.getComponentByName('critical')), | |
'value': value, | |
'content': parse_extension(oid, value) | |
}) | |
index += 1 | |
return extensions | |
def crl_load(crl_data): | |
""" | |
Parse the CRL with input in DER or PEM. | |
""" | |
if not crl_data: | |
raise ServerException(u'No data for CRL.') | |
lines = crl_data.strip().splitlines() | |
if 'BEGIN X509 CRL' in lines[0]: | |
# We have a PEM data | |
# Now convert to DER. | |
content = [] | |
for line in lines[1:]: | |
if 'END X509 CRL' in line: | |
# End of content. | |
break | |
content.append(line) | |
crl_der = pem_lines_to_der(content) | |
else: | |
crl_der = crl_data | |
return crl_parse(crl_der) | |
def get_subject(components, error_location): | |
""" | |
Return subject components as string in OpenSSL text format. | |
""" | |
if not components: | |
raise ServerException( | |
u'Missing X509 subject in %s.' % (error_location,)) | |
parts = [] | |
for name, value in components: | |
parts.append(u'/%s=%s' % (from_utf8(name), from_utf8(value))) | |
return u''.join(parts) | |
def get_CRLs_from_CDP_extension(certificate): | |
""" | |
Return a list of CRL location as advertised in the x509 extension for | |
`certificate` | |
""" | |
result = [] | |
for index in range(0, certificate.get_extension_count()): | |
extension = certificate.get_extension(index) | |
if extension.get_short_name() != 'crlDistributionPoints': | |
continue | |
data = extension.get_data() | |
cdp = ber_decoder.decode( | |
data, asn1Spec=rfc2459.CRLDistPointsSyntax())[0] | |
for distribution_point in cdp: | |
dp = distribution_point.getComponentByName('distributionPoint') | |
dp_name = dp.getComponentByName('fullName')[0].getComponent() | |
if dp_name.getTagSet()[0][2] != 6: | |
# For now only uniformResourceIdentifier are supported. | |
continue | |
result.append(dp_name.asOctets().decode('ascii')) | |
return result | |
def get_common_name_from_subject(subject): | |
""" | |
Return the Common Name value from the X509 OpenSSL subject. | |
""" | |
try: | |
return subject.commonName | |
except crypto.Error as error: | |
if error.args[0][0][1] != 'ASN1_mbstring_copy': | |
raise error | |
# On older versions of OpenSSL we ca not decode newer x509 | |
# certificates using the commoneName attribute. | |
# This happens on Solaris 10 for example. | |
components = subject.get_components() | |
for key, value in components: | |
if key != 'CN': | |
continue | |
return value.decode('utf-8') | |
def pem_lines_to_der(lines): | |
""" | |
Convert the BASE64 PEM lines to ASN.1 serialization. | |
Lines should be passed without the armor. | |
""" | |
content = [] | |
for line in lines: | |
# Accumulate content without newlines or spaces. | |
content.append(line.strip('\n\t\r ')) | |
return a2b_base64(''.join(content)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment