Skip to content

Instantly share code, notes, and snippets.

@andyeff
Created February 10, 2022 16:31
Show Gist options
  • Save andyeff/9b20f6e69c1948bbfa3b59a1444a74c2 to your computer and use it in GitHub Desktop.
Save andyeff/9b20f6e69c1948bbfa3b59a1444a74c2 to your computer and use it in GitHub Desktop.
Python SSL Cert Checker for AWS Lambda
"""
SSL Cert inspection function
OpenSSL is available from the PyOpenSSL package
"""
import json
import logging
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
from OpenSSL import crypto
import base64
# Set up some initial logging
logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger(__name__)
def lambda_handler(event, context):
cert_secret = get_secret("my/aws/secret/cert")
mycert = cert_secret["cert"]
mykey = cert_secret["key"]
# Cert is stored in AWS Secrets base64 encoded, so lets do the decode-dance
cert_b64b = mycert.encode("ascii")
key_b64b = mykey.encode("ascii")
cert_b = base64.b64decode(cert_b64b)
key_b = base64.b64decode(key_b64b)
plaincert = cert_b.decode("ascii")
plainkey = key_b.decode("ascii")
# Write the files to "disk" so it can be used as a client cert by Requests, for example
with open("/tmp/cert.crt", "w") as file:
file.writelines(plaincert)
with open("/tmp/key.pem", "w") as file:
file.writelines(plainkey)
cert = crypto.load_certificate(crypto.FILETYPE_PEM, plaincert)
# Get expiry date
cert_expiry = cert.get_notAfter()
cert_expiry = cert_expiry.decode("utf-8")
cert_expiry = datetime.strptime(cert_expiry, "%Y%m%d%H%M%SZ")
cert_expiry_str = datetime.strftime(cert_expiry, "%m/%d/%Y, %H:%M:%S")
# Check if expired
cert_hasexpired = cert.has_expired()
# Get subject
cert_subject = cert.get_subject()
cstr = str(cert_subject)
# Sample return response
return {"certExpiry": cert_expiry_str, "expiryStatus": cert_hasexpired, "subject": cstr}
# --- helper functions
def get_secret(requested_secret: str) -> dict:
"""
get_db_secret:
takes a string, returns a dict of the secrets contents
secret is parsed to ensure it contains all required fields
for RDS connectivity
"""
session = boto3.session.Session()
client = session.client(service_name="secretsmanager", region_name="eu-west-2")
try:
get_secret_value_response = client.get_secret_value(SecretId=requested_secret)
except ClientError as error:
logger.error(f"Error in obtaining secret {requested_secret}")
if error.response["Error"]["Code"] == "AccessDeniedException":
logger.error("Access failure reading secrets")
elif error.response["Error"]["Code"] == "InvalidRequestException":
logger.error("Invalid Request - confirm KMS is valid")
elif error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.error(f"Could not find requested secret {requested_secret}")
else:
logger.error(f"Error while obtaining secret {requested_secret}")
logger.error(error)
raise error
secretstr = get_secret_value_response["SecretString"]
secret = json.loads(secretstr)
return secret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment