Skip to content

Instantly share code, notes, and snippets.

@kmkale
Last active January 3, 2023 16:08
Show Gist options
  • Save kmkale/27bb96c2ecfa5c31d64a6919f3280731 to your computer and use it in GitHub Desktop.
Save kmkale/27bb96c2ecfa5c31d64a6919f3280731 to your computer and use it in GitHub Desktop.
AWS API Gw mTLS Lambda Authoriser for certificate revocation check with OCSP & CLR
import json
import os
from certvalidator import CertificateValidator, ValidationContext, errors
import boto3
from asn1crypto import pem
'''
Let's load our truststore from s3. Doing this outside of handler function so that this will be loaded only on coldstart.
If the truststore contents change, you need to update the lambda env var 'TRUSTSTORE_FILE_VERSIONID'
with the new files versionId. And also update the same in 'API Gateway > Custom domain names > Domain details > Truststore version' and wait till Status becomes Available.
If APi Gateway finds some problem with the truststore, such as could not find complete chain, it will display a warning. The warning details will tell you which cert it has a problem with and the problem. You need to fix the truststore chain till this warning goes away.
'''
s3_client = boto3.client('s3')
bucket = os.environ.get('TRUSTSTORE_BUCKET')
key = os.environ.get('TRUSTSTORE_FILENAME')
versionId = os.environ.get('TRUSTSTORE_FILE_VERSIONID')
download_path = '/tmp/{}'.format(key)
s3_client.download_file(bucket, key, download_path, ExtraArgs={'VersionId': versionId})
trust_roots = []
with open(download_path, 'rb') as f:
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
trust_roots.append(der_bytes)
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
''' Get the client cert from lambda event '''
cert = event["requestContext"]["authentication"]["clientCert"]["clientCertPem"].encode()
'''
hard-fail mode, any error in checking revocation is considered a failure. However, if there is no known source of revocation information, it is not considered a failure.
This allows us to keep using self signed certs too.
'''
context = ValidationContext(allow_fetching=True, revocation_mode="hard-fail", trust_roots=trust_roots)
try:
validator = CertificateValidator(cert, validation_context=context)
validator.validate_usage(
set(['digital_signature', 'key_encipherment']),
set(['server_auth', 'client_auth']),
True
)
except Exception as inst:
print(inst)
print("The certificate could not be validated")
return {
"isAuthorized": "false",
"context": {
"exception": str(inst.args)
}
}
else:
print("The certificate is ok")
return {
"isAuthorized": "true",
"context": {
"exception": None
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment