Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Last active May 19, 2017 17:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adiroiban/d4af3aa243924609cd54e504b9fa2895 to your computer and use it in GitHub Desktop.
Save adiroiban/d4af3aa243924609cd54e504b9fa2895 to your computer and use it in GitHub Desktop.
pyopenssl SSLContext.set_verify callback for CRL / CDP validation
class CRLValidator(object):
"""
Helper for validating certificates against CRL.
For now it only support a single CRL signed by a single CA.
It provides the `CRLValidator.validate(certificate)` method which can be
used as a callback for `SSLContext.set_verify`.
"""
_random = random
def __init__(self, scheduler, update_interval, event_emitter):
if not scheduler:
scheduler = reactor
self._scheduler = scheduler
self._emitEvent = event_emitter
# Time in seconds after which the CRL is loaded.
# 0 for auto-reload.
self._update_interval = update_interval
# HTTP client used to get CRL over HTTP.
self._http_agent = NonPersistentAgent()
self._resetBeforeStartState()
def _resetBeforeStartState(self):
"""
Set the validator state as before starting.
"""
# Deferred attached to the next update for the CRL location.
self._next_update_deferreds = {}
# Mapping from CA subject to CA cert used to validate CRL signature.
self._ca_certs = {}
self._running = False
# Mapping between CA subject and list of revoked ids.
self._revoked_serials = {}
# Mapping between CRL location to issuer.
self._crl_location_to_issuer = {}
# True when validator should auto-load CRL from CDP.
self._use_cdp = False
def start(self):
"""
Called after CRL validator was configured and can be used.
"""
if not self._ca_certs:
raise ServerException(u'CA was not set to validate the CRL.')
if not self._revoked_serials:
raise ServerException(u'No CRL was configured.')
self._running = True
def stop(self):
"""
Called when CRL validator should no longer be used.
"""
for location, deferred in self._next_update_deferreds.items():
if not deferred.called:
deferred.cancel()
self._resetBeforeStartState()
@property
def running(self):
"""
`True` when the validator should be used.
"""
return self._running
@defer.inlineCallbacks
def configure(self, ca, crls, certificate):
"""
Configure based on CA PEM file stored at `ca` path and load CRLs
from `crl` list.
Return a deferred which is fired when the CRL is configured.
"""
self._addCAFromPath(path=ca)
if TYPE_NAME.CRL_DISTRIBUTION_POINTS in crls:
deferred = self._configureCDP(start_certificate=certificate)
else:
deferred = self._addCRLs(crls=crls)
# Wait for initial CRL cache to be loaded.
yield deferred
def _addCAFromPath(self, path):
"""
Add CA certificates stored as PEM file on local filesystem to
validate the CRL.
"""
if self._ca_certs:
# For now we don't support multiple calls to this method since
# we don't need it.
# This can be change if we somehow need it.
raise ServerException(
u'Only a single CA file can be loaded for CRL validation.')
ca_segments = local_filesystem.getSegmentsFromRealPath(path)
if local_filesystem.isFolder(ca_segments):
raise ServerException(
u'CRL can only be used when CA certificates are stored '
u'in a single file.'
)
ca_file = None
try:
ca_file = local_filesystem.openFileForReading(ca_segments)
pem_objects = pem.parse(ca_file.read())
finally:
if ca_file:
ca_file.close()
ca_certs = {}
for pem_object in pem_objects:
if not isinstance(pem_object, pem.Certificate):
continue
cert = crypto.load_certificate(
crypto.FILETYPE_PEM, pem_object.as_bytes())
subject = x509.get_subject(
cert.get_subject().get_components(),
u'CA loaded from %s' % (path,),
)
ca_certs[subject] = cert
if not ca_certs:
raise ServerException(
u'CA file used to validated the CRL has no certificates.')
self._ca_certs.update(ca_certs)
def _addCRLs(self, crls):
"""
Add a list of CRLs to validate against.
Returns a deferred which is fired when the CRLs were loaded,
successfully or not.
On failure, only the error from the first CRL is raised.
If you want to catch errors from all CRLs call this separately with
a CLR at a time.
"""
def cb_update_crl(result):
get_method, location = result
issuer = self._crl_location_to_issuer.get(location, None)
if issuer:
# Location is already loaded. Nothing new to do.
return
deferred = self._updateCRL(get_method, location)
return deferred
deferreds = []
for crl in crls:
deferred = self._getCRLMethod(crl_configuration=crl)
# Do an initial load to forward exceptions to caller and
# then schedule the periodic updates.
deferred.addCallback(cb_update_crl)
deferreds.append(deferred)
def cb_list_result(results):
"""
This is fired from the DeferredList and is used to convert it
into a simple Deferred failure.
"""
for index, result in enumerate(results):
have_succeed, result_or_failure = result
if have_succeed:
continue
location = crls[index]
raise self._getCRLLoadErrorWithInvalidation(
result_or_failure.value, location)
deferred = defer.DeferredList(deferreds, consumeErrors=True)
deferred.addCallback(cb_list_result)
return deferred
def _configureCDP(self, start_certificate):
"""
Configure the validator to use CRL advertised by CDP.
An initial CRL is extracted from the CDP of the `start_certificate`
to cache the most probable CDP in use.
Returns a deferred which is fired when the initial CRL was loaded or
is errbacked on errors.
"""
self._use_cdp = True
crls = x509.get_CRLs_from_CDP_extension(certificate=start_certificate)
if not crls:
error = self._getCRLLoadErrorWithInvalidation(
error=ServerException(
u'No CDP extension defined for the local certificate.'),
location='crl-distribution-points',
)
return defer.fail(error)
# FIXME:3494:
# We should support multiple CRL locations and look at loading
# all the advertised CRL.
return self._addCRLs(crls=crls[:1])
def validate(self, peer_certificate):
"""
Check if certificate is valid and not part of a CRL.
Raise ServerException with details if certificate is not valid.
"""
if not self._running:
raise ServerException(u'CRL validation was not yet started.')
issuer = x509.get_subject(
peer_certificate.get_issuer().get_components(),
'peer certificate'
)
revoked_is = self._revoked_serials.get(issuer, _CRL_NOT_YET_LOADED)
if self._use_cdp:
# When CDP is used, clients should always use a certificate which
# is advertising a CDP.
crls = x509.get_CRLs_from_CDP_extension(peer_certificate)
if not crls:
raise ServerException(
u'CDP is in usage and peer certificate '
u'has no CDP extension.'
)
if revoked_is is _CRL_NOT_YET_LOADED:
if not self._use_cdp:
# CDP are not used and CRL for this issuer is not yet
# loaded.
raise ServerException(
u'CRL not yet loaded for %s.' % (issuer,))
# We are using CDP, and we don't have a CRL yet.
# So load the CRL if is not already loaded.
load_message = self._loadCDP(
crls=crls, peer_certificate=peer_certificate)
raise ServerException(load_message)
# We have a revocation list for this issuer.
if peer_certificate.get_serial_number() in revoked_is:
raise ServerException(u'Revoked certificate.')
def _loadCDP(self, crls, peer_certificate):
"""
Load a CRL at runtime, as advertised in the CDP extension.
Return a human readable message describing the action which was
taken.
"""
# FIXME:3494:
# We should support multiple CRL locations and look at loading
# all the advertised CRL.
crls = crls[:1]
if (
crls[0] in self._next_update_deferreds and
not self._next_update_deferreds[crls[0]].called
):
# CRL is in the processes of being updated so until it is
# loaded, we fail.
return u'CDP/CRL for the peer certificate is not loaded yet.'
# Make sure we have a subject for the peer certificate.
peer_subject = x509.get_subject(
peer_certificate.get_subject().get_components(),
'peer_certificate during CDP/CRL',
)
# No CRL yet for this peer certificate, so we trigger their loading.
deferred = self._addCRLs(crls=crls)
deferred.addErrback(lambda failure: emit_error_details(
error=failure.value,
event_id=u'20178',
data={'peer_subject': peer_subject},
emitter=self._emitEvent,
))
self._cdp_auto_load_deferred = deferred
# We fail this validation as we don't have yet a CRL.
return u'CDP/CRL for the peer certificate is being loaded.'
def _now(self):
"""
Return the current datetime in UTC.
"""
result = datetime.utcfromtimestamp(self._scheduler.seconds())
return result.replace(tzinfo=UTC)
def _getCRLLoadErrorWithInvalidation(self, error, location):
"""
Return a ServerError associated with CRL load operation and invalidate
the currently loaded CRL.
"""
self._invalidateCRL(location)
data = {
'uri': location,
'details': unicode(error),
}
return ServerError(u'20036', data=data)
def _invalidateCRL(self, location):
"""
Invalidate the CRL which is already loaded from location.
"""
try:
issuer = self._crl_location_to_issuer[location]
self._revoked_serials[issuer] = _CRL_NOT_YET_LOADED
except KeyError:
# It might be that we were not able to load the location so we
# don't know its issuer.
pass
def _getCRLMethod(self, crl_configuration):
"""
Return a deferred which is fired with tuple of
(get_method, location) for the single `crl_configuration`.
"""
get_method = None
location = crl_configuration
if not location:
return defer.fail(ServerException(
u'CRL location can not be empty.'))
url = urlparse(crl_configuration)
if crl_configuration[1:3] == ':\\':
# Windows absolute path without a scheme.
get_method = self._loadCRLFromPath
elif url.scheme == '':
# No scheme is fallback to local path.
get_method = self._loadCRLFromPath
elif url.scheme == 'file':
get_method = self._loadCRLFromPath
if url.netloc:
# we have a relative path or a Windows path and the final
# location is url.netloc + url.path
if not url.path:
# We have a windows path or a relative path comprised of
# a single segment.
# FIXME:3480:
# Might need to store it as absolute path.
location = url.netloc
else:
# We have Unix relative path.
# FIXME:3480:
# Might need to store it as absolute path.
location = url.netloc + url.path
else:
# We have absolute unix path.
location = url.path
elif url.scheme == 'http':
get_method = self._loadCRLFromURL
else:
return defer.fail(ServerException(
u'Unknown method for loading CRL: %s' % (url.scheme,)))
return defer.succeed((get_method, location))
@defer.inlineCallbacks
def _updateCRL(self, get_method, location):
"""
Load/reload revoked certificates from the configured CRL.
"""
deferred = defer.maybeDeferred(get_method, location)
# Let other know that we are loading a CRL for this location.
# It will be later overwritten by the schedule call, but in
# case someone else want to know if there is already a deferred for
# this location, it will check via this dict.
self._next_update_deferreds[location] = deferred
crl_data = yield deferred
if not crl_data:
raise ServerException(u'Empty CRL at the configured location.')
crl_details = x509.crl_load(crl_data)
issuer = crl_details['issuer']
try:
ca_cert = self._ca_certs[issuer]
except KeyError:
raise ServerException(
u'CRL signed by a CA which is not trusted: %s' % (issuer,))
try:
crypto.verify(
ca_cert,
crl_details['signature'],
crl_details['to-be-signed'],
crl_details['digest-algorithm'],
)
except crypto.Error as error:
raise ServerException(u'CRL with invalid signature: %s' % (error,))
new_revoked_certificates = []
revoked_certificates = crl_details['revoked']
for revoked in revoked_certificates:
new_revoked_certificates.append(revoked['serial'])
existing_issuer = self._crl_location_to_issuer.get(location, issuer)
if existing_issuer != issuer:
self._revoked_serials[existing_issuer] = _CRL_NOT_YET_LOADED
raise ServerException(
u'Reseting current CRL. '
u'Issuer of the CRL at %s was changed from "%s" to "%s".' % (
location, existing_issuer, issuer))
# Check to see that the current issuer is not already loaded from
# another location.
for other_location, other_issuer in (
self._crl_location_to_issuer.items()
):
if other_issuer == issuer and other_location != location:
# FIXME:3494:
# This should be flexible as we should allow CRL fallback and
# in that case we would have multiple CRL for a single issuer.
raise ServerException(
u'CRL for %s is already loaded from %s. '
u'It was tried to also be loaded from %s.' % (
issuer, other_location, location))
self._revoked_serials[issuer] = new_revoked_certificates
self._crl_location_to_issuer[location] = issuer
next_update = crl_details['next-update']
delay = self._getCRLUpdateDelay(crl_details)
self._scheduleCRLNextUpdate(delay, get_method, location, next_update)
update_datetime = self._now() + timedelta(seconds=delay)
self._emitEvent(u'20037', data={
'uri': location,
'issuer': issuer,
'count': len(new_revoked_certificates),
'update_datetime': update_datetime,
'update_seconds': delay,
'next_publish': crl_details['next-publish'],
'next_update': next_update,
})
def _loadCRLFromPath(self, path):
"""
Load the raw CRL from a local filesystem.
"""
crl_segments = local_filesystem.getSegmentsFromRealPath(path)
if local_filesystem.isFolder(crl_segments):
raise ServerException(
u'Loading CRL from directories is not supported.')
crl_file = local_filesystem.openFileForReading(crl_segments)
try:
crl_data = crl_file.read()
finally:
crl_file.close()
return crl_data
def _loadCRLFromURL(self, url):
"""
Load the raw CRL over HTTP.
"""
# FIXME:3465:
# HTTP request should be done via a redirection agent.
deferred = self._http_agent.request('GET', url.encode('utf-8'))
def cb_got_response(response):
if response.code != 200:
raise ServerException(
u'Failed to retrieve the CRL at %s over HTTP: %s %s' % (
url, response.code, response.phrase))
return self._http_agent.readBody(response)
deferred.addCallback(cb_got_response)
return deferred
def _getCRLUpdateDelay(self, crl_details):
"""
Return the seconds after which the CRL should be updated.
"""
next_update = crl_details['next-update']
next_publish = crl_details['next-publish']
if next_update:
now = self._now()
crl_delta = next_update - now
next_load = crl_delta.total_seconds()
if next_load <= 0:
raise ServerException(
u'CRL had expired on UTC %s. Now is %s' % (
next_update.isoformat(), now.isoformat()))
if next_publish:
if next_publish > next_update:
raise ServerException(
u'CRL next publish at %s UTC is after '
u'next update at %s UTC.' % (
next_publish.isoformat(), next_update.isoformat()))
# We have the CRL Next Publish extension, so use that value.
crl_delta = next_publish - now
next_publish_load = crl_delta.total_seconds()
if next_publish_load <= 0:
# We are pass the next publish time.
# Retry using the soft retry or next update,
# which comes first.
next_load = min(next_load, _CRL_SOFT_RELOAD_DELAY)
else:
next_load = next_publish_load
if self._update_interval:
# We have a pre-configured refresh rate.
next_load = self._update_interval
else:
# If the current CRL had expired, there is no next update to
# schedule.
if not next_update:
raise ServerException(u'CRL has no nextUpdate field.')
# Use the refresh value advertised in the certificate to
# compute the next refresh time.
# Refresh it with a random delay of up to 60 seconds so what we
# don't hit it at the exact refresh time.
next_load = next_load + self._random.randint(0, 60)
return next_load
def _scheduleCRLNextUpdate(self, delay, get_method, location, next_update):
"""
Called to schedule the next update of the CRL.
"""
def eb_update(failure, location):
if failure.check(defer.CancelledError):
# We are stopping so this is a normal way to signal.
return
if next_update <= self._now():
# The CRL is no longer valid.
self._invalidateCRL(location)
next_load = _CRL_SOFT_RELOAD_DELAY
else:
# Retry after the soft reload or at the final next update,
# which is sooner, so that we don't run with a loaded CRL
# past the Next Update date.
crl_delta = next_update - self._now()
# Delay an extra 1 second so that on fast system with low
# clock resolution we don't fail to often.
next_load = crl_delta.total_seconds() + 1
next_load = min(next_load, _CRL_SOFT_RELOAD_DELAY)
# Try reloading the CRL.
self._scheduleCRLNextUpdate(
delay, get_method, location, next_update)
self._emitEvent(u'20038', data={
'uri': location,
'next_load': next_load,
'details': unicode(failure.value),
})
deferred = task.deferLater(
clock=self._scheduler,
delay=delay,
callable=self._updateCRL,
get_method=get_method,
location=location,
)
deferred.addErrback(eb_update, location)
self._next_update_deferreds[location] = deferred
# Copyright (c) 2016 Adi Roiban.
# See LICENSE for details.
"""
Handling of X.509 files.
"""
from builtins import range
import datetime
from binascii import a2b_base64
from Crypto.Util import asn1
from OpenSSL import crypto
from pyasn1.codec.ber import decoder as ber_decoder
from pyasn1.error import PyAsn1Error
from pyasn1.type import univ
from pyasn1_modules import rfc2459
from chevah.server import from_utf8
from chevah.server.commons import UTC
from chevah.server.commons.exception import ServerException
OID_to_name = {
# DN oids.
(2, 5, 4, 3): 'CN', # common name
(2, 5, 4, 6): 'C', # country
(2, 5, 4, 7): 'L', # locality
(2, 5, 4, 8): 'ST', # stateOrProvince
(2, 5, 4, 10): 'O', # organization
(2, 5, 4, 11): 'OU', # organizationalUnit
(0, 9, 2342, 19200300, 100, 1, 25): 'DC', # domainComponent
(1, 2, 840, 113549, 1, 9, 1): 'emailAddress',
# Digest algorithms.
# See: http://www.alvestrand.no/objectid/1.2.840.113549.1.1.html
(1, 2, 840, 113549, 1, 1, 4): b'md5',
(1, 2, 840, 113549, 1, 1, 5): b'sha1',
(1, 2, 840, 113549, 1, 1, 11): b'sha256',
(1, 2, 840, 113549, 1, 1, 13): b'sha512',
}
name_to_OID = {value: key for key, value in OID_to_name.items()}
def _from_bits(bits):
"""
Return a string of bytes from a list of 0/1 bits.
"""
chars = []
for b in range(len(bits) / 8):
byte = bits[b * 8:(b + 1) * 8]
chars.append(chr(int(''.join([str(bit) for bit in byte]), 2)))
return ''.join(chars)
def _decode_time(obj, time_name):
"""
Decode x.509 time objects.
See RFC 5280.
It supports UTCTime only in GMT.
"""
if obj is None:
return None
time_obj = obj.getComponentByName('utcTime')
if time_obj is None:
# We don't have utcTime, so we try with generalized time.
time_obj = obj.getComponentByName('generalTime')
if time_obj is None:
# We should not hit this place as a certificate should be encoded
# with either URC or Generalized time.
raise ServerException(u'No time found in CRL for %s.' % (time_name,))
time_formats = {
# 4.1.2.5.1. UTCTime.
23: "%y%m%d%H%M%SZ",
# 4.1.2.5.2. GeneralizedTime
# ASN1 defines multiple formats for formats, RFC 5280 mandates a single
# format.
24: "%Y%m%d%H%M%SZ",
}
time_format = time_formats[time_obj.tagSet.uniq[1]]
return datetime.datetime.strptime(
time_obj.asOctets(), time_format).replace(tzinfo=UTC)
def _decode_rdn_components(rdn_sequence):
"""
Return a list with name and value of RelativeDistinguishedName.
"""
parts = []
index = 0
while True:
try:
component = rdn_sequence.getComponentByPosition(index)
except IndexError:
break
component = component.getComponentByPosition(0)
comp_type = component.getComponentByName('type').asTuple()
value = component.getComponentByName('value').asOctets()[2:]
name = OID_to_name[comp_type]
parts.append((name, value))
index += 1
return parts
def crl_parse(crl_der):
"""
Return a dictionary with the CRL attributes of the CRL passed as DER.
RFC 5280
CertificateList ::= SEQUENCE {
tbsCertList TBSCertList,
signatureAlgorithm AlgorithmIdentifier,
signatureValue BIT STRING }
TBSCertList ::= SEQUENCE {
version Version OPTIONAL,
-- if present, MUST be v2
signature AlgorithmIdentifier,
issuer Name,
thisUpdate Time,
nextUpdate Time OPTIONAL,
revokedCertificates SEQUENCE OF SEQUENCE {
userCertificate CertificateSerialNumber,
revocationDate Time,
crlEntryExtensions Extensions OPTIONAL
-- if present, version MUST be v2
} OPTIONAL,
crlExtensions [0] EXPLICIT Extensions OPTIONAL
-- if present, version MUST be v2
}
Version ::= INTEGER { v1(0), v2(1), v3(2) }
CertificateSerialNumber ::= INTEGER
Time ::= CHOICE {
utcTime UTCTime,
generalTime GeneralizedTime }
"""
try:
crl_asn1 = ber_decoder.decode(
crl_der, asn1Spec=rfc2459.CertificateList())[0]
except PyAsn1Error as error:
raise ServerException(u'Failed to decode CRL: %s' % (error.message))
to_be_signed = crl_asn1.getComponentByName('tbsCertList')
issuer = to_be_signed.getComponentByName('issuer').getComponentByName('')
this_update = to_be_signed.getComponentByName('thisUpdate')
next_update = to_be_signed.getComponentByName('nextUpdate')
signature_value = crl_asn1.getComponentByName('signature')
signature_algorithm = crl_asn1.getComponentByName('signatureAlgorithm')
algorithm = signature_algorithm.getComponentByName('algorithm').asTuple()
try:
digest_name = OID_to_name[algorithm]
except KeyError:
raise ServerException(
u'CRL signature algorithm not supported: %s' % (algorithm,))
revoked = to_be_signed.getComponentByName('revokedCertificates')
revoked_certificates = _parse_revoked(revoked)
parsed_extensions = to_be_signed.getComponentByName('crlExtensions')
extensions = _parse_extensions(parsed_extensions)
# We get the raw data to be signed as pyasn1 has no easy way to export
# the decoded data... other than re-encoding it, and we might re-encode
# it in a different order and the signature will be invalid.
crl_seq = asn1.DerSequence()
crl_seq.decode(crl_der)
# Look for next publish extension and expose it as root attribute.
next_publish = None
for extension in extensions:
if extension['name'] != 'CRL Next Publish':
continue
next_publish = extension['content']['nextPublish']
return {
'to-be-signed': crl_seq[0],
'issuer': get_subject(
_decode_rdn_components(issuer),
'loaded CRL issuer',
),
'signature': _from_bits(signature_value),
'digest-algorithm': digest_name,
'this-update': _decode_time(this_update, u'thisUpdate'),
'next-update': _decode_time(next_update, u'nextUpdate'),
'next-publish': next_publish,
'revoked': revoked_certificates,
'extensions': extensions,
}
def _parse_revoked(revoked):
"""
Return the list with certificates in the revocation list.
"""
if not revoked:
return []
revoked_certificates = []
index = 0
while True:
try:
revoked_certificate = revoked.getComponentByPosition(index)
except IndexError:
break
serial = revoked_certificate.getComponentByName('userCertificate')
revoked_certificates.append({
'serial': int(serial),
})
index += 1
return revoked_certificates
def _parse_extensions(parsed_extensions):
"""
Return the list of extensions as serialized in ASN.1 `parsed_extensions`
sequence.
"""
if not parsed_extensions:
return []
def oid_to_name(oid):
"""
Convert the CRL extension OID to a human readable name.
Return the same `oid` if extension is not known.
"""
mapping = {
# Microsoft CertSrv Infrastructure
# https://support.microsoft.com/en-us/kb/287547
'1.3.6.1.4.1.311.21.4': u'CRL Next Publish',
# Standard extensions.
# As defined in RFC 5280.
'2.5.29.9': u'Subject Directory Attributes',
'2.5.29.14': u'Subject Key Identifier',
'2.5.29.15': u'Key Usage',
'2.5.29.16': u'Private Key Usage Period',
'2.5.29.17': u'Subject Alternative Name',
'2.5.29.18': u'Issuer Alternative Name',
'2.5.29.19': u'Basic Constraints',
'2.5.29.20': u'CRL Number',
'2.5.29.21': u'Revocation Reason',
'2.5.29.24': u'Invalidity Date',
'2.5.29.27': u'Delta CRL Indicator',
'2.5.29.28': u'Issuing Distribution Point',
'2.5.29.29': u'Certificate Issuer',
'2.5.29.30': u'Name Constraints',
'2.5.29.31': u'CRL Distribution Points',
'2.5.29.32': u'Certificate Policies',
'2.5.29.33': u'Policy Mappings',
'2.5.29.35': u'Authority Key Identifier',
'2.5.29.36': u'Policy Constraints',
'2.5.29.37': u'Extended Key Usage',
'2.5.29.46': u'Freshest CRL (a.k.a. Delta CRL Distribution Point)',
'2.5.29.54': u'Inhibit anyPolicy',
}
return mapping.get(oid, oid)
def parse_extension(oid, value):
"""
Try to parse the CRL extension, returning an empty dict if the
extension could not be parsed.
"""
result = {}
if oid == '1.3.6.1.4.1.311.21.4':
# https://tools.ietf.org/html/draft-deacon-lightweight-ocsp-profile-00#page-12
next_publish = ber_decoder.decode(
value, asn1Spec=rfc2459.Time())[0]
result['nextPublish'] = _decode_time(next_publish, u'nextPublish')
return result
if oid == '2.5.29.20':
crl_number = ber_decoder.decode(value, asn1Spec=univ.Integer())[0]
result['crlNumber'] = int(crl_number)
return result
return result
extensions = []
index = 0
while True:
try:
extension = parsed_extensions.getComponentByPosition(index)
except IndexError:
break
oid = unicode(extension.getComponentByName('extnID'))
# Extension value is defined as an ANY type so we convert it to
# octets.
value = ber_decoder.decode(
extension.getComponentByName('extnValue'),
univ.OctetString(),
)[0]
extensions.append({
'oid': oid,
'name': oid_to_name(oid),
'critical': bool(extension.getComponentByName('critical')),
'value': value,
'content': parse_extension(oid, value)
})
index += 1
return extensions
def crl_load(crl_data):
"""
Parse the CRL with input in DER or PEM.
"""
if not crl_data:
raise ServerException(u'No data for CRL.')
lines = crl_data.strip().splitlines()
if 'BEGIN X509 CRL' in lines[0]:
# We have a PEM data
# Now convert to DER.
content = []
for line in lines[1:]:
if 'END X509 CRL' in line:
# End of content.
break
content.append(line)
crl_der = pem_lines_to_der(content)
else:
crl_der = crl_data
return crl_parse(crl_der)
def get_subject(components, error_location):
"""
Return subject components as string in OpenSSL text format.
"""
if not components:
raise ServerException(
u'Missing X509 subject in %s.' % (error_location,))
parts = []
for name, value in components:
parts.append(u'/%s=%s' % (from_utf8(name), from_utf8(value)))
return u''.join(parts)
def get_CRLs_from_CDP_extension(certificate):
"""
Return a list of CRL location as advertised in the x509 extension for
`certificate`
"""
result = []
for index in range(0, certificate.get_extension_count()):
extension = certificate.get_extension(index)
if extension.get_short_name() != 'crlDistributionPoints':
continue
data = extension.get_data()
cdp = ber_decoder.decode(
data, asn1Spec=rfc2459.CRLDistPointsSyntax())[0]
for distribution_point in cdp:
dp = distribution_point.getComponentByName('distributionPoint')
dp_name = dp.getComponentByName('fullName')[0].getComponent()
if dp_name.getTagSet()[0][2] != 6:
# For now only uniformResourceIdentifier are supported.
continue
result.append(dp_name.asOctets().decode('ascii'))
return result
def get_common_name_from_subject(subject):
"""
Return the Common Name value from the X509 OpenSSL subject.
"""
try:
return subject.commonName
except crypto.Error as error:
if error.args[0][0][1] != 'ASN1_mbstring_copy':
raise error
# On older versions of OpenSSL we ca not decode newer x509
# certificates using the commoneName attribute.
# This happens on Solaris 10 for example.
components = subject.get_components()
for key, value in components:
if key != 'CN':
continue
return value.decode('utf-8')
def pem_lines_to_der(lines):
"""
Convert the BASE64 PEM lines to ASN.1 serialization.
Lines should be passed without the armor.
"""
content = []
for line in lines:
# Accumulate content without newlines or spaces.
content.append(line.strip('\n\t\r '))
return a2b_base64(''.join(content))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment