Skip to content

Instantly share code, notes, and snippets.

@recalde
Last active February 25, 2024 15:31
Show Gist options
  • Save recalde/9a0c7ad381ff68e2184429599c049758 to your computer and use it in GitHub Desktop.
Save recalde/9a0c7ad381ff68e2184429599c049758 to your computer and use it in GitHub Desktop.
import os
import boto3
import json
import jwt
from jwt.algorithms import RSAAlgorithm
import requests
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # You can adjust this to DEBUG, WARNING, etc.
def lambda_handler(event, context):
# Retrieve Okta API endpoint from environment variables
okta_api_endpoint = os.environ['OKTA_API_ENDPOINT']
# Retrieve clientId and clientSecret from KMS
kms = boto3.client('kms')
encrypted_client_id = os.environ['ENCRYPTED_CLIENT_ID']
encrypted_client_secret = os.environ['ENCRYPTED_CLIENT_SECRET']
# Log the start of the function
logger.info("Starting Lambda function to validate JWT token")
# Decrypting clientId and clientSecret
client_id = kms.decrypt(CiphertextBlob=bytes(encrypted_client_id))['Plaintext'].decode('utf-8')
client_secret = kms.decrypt(CiphertextBlob=bytes(encrypted_client_secret))['Plaintext'].decode('utf-8')
# Extract token from the incoming request
token = event['authorizationToken']
try:
# Retrieve Okta's public keys
jwks_uri = f"{okta_api_endpoint}/v1/keys"
jwks_response = requests.get(jwks_uri)
jwks = jwks_response.json()
# Decode JWT token without verification to obtain kid
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks['keys']:
if key['kid'] == unverified_header['kid']:
rsa_key = {
'kty': key['kty'],
'kid': key['kid'],
'use': key['use'],
'n': key['n'],
'e': key['e']
}
if rsa_key:
# Validate and decode the JWT token
payload = jwt.decode(
token,
RSAAlgorithm.from_jwk(json.dumps(rsa_key)),
algorithms=['RS256'],
audience=client_id,
issuer=f"{okta_api_endpoint}"
)
# Log successful validation
logger.info("JWT token validation successful")
# Generate allow policy document
return generate_policy_document('user', 'Allow', event['methodArn'])
except Exception as e:
# Log the error
logger.error(f"Token validation error: {str(e)}")
# In case of any error, deny access
return generate_policy_document('user', 'Deny', event['methodArn'])
def generate_policy_document(principal_id, effect, resource):
auth_response = {}
auth_response['principalId'] = principal_id
if effect and resource:
policy_document = {
'Version': '2012-10-17',
'Statement': [
{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}
]
}
auth_response['policyDocument'] = policy_document
return auth_response
# Additional logging can be added to other functions as needed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment