# healthlake/main.tf
## HealthLake ##
resource "awscc_healthlake_fhir_datastore" "this" {
datastore_name = "my-fhir-datastore"
datastore_type_version = "R4"
preload_data_config = {
preload_data_type = "SYNTHEA"
}
identity_provider_configuration = {
authorization_strategy = "SMART_ON_FHIR_V1"
fine_grained_authorization_enabled = true
idp_lambda_arn = aws_lambda_function.token_validator.arn
metadata = jsonencode({
issuer = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com"
authorization_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/authorize"
token_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/token"
jwks_uri = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"
response_types_supported = ["code", "token"]
response_modes_supported = ["query", "fragment", "form_post"]
grant_types_supported = [
"authorization_code",
"implicit",
"refresh_token",
"password",
"client_credentials"
]
subject_types_supported = ["public"]
scopes_supported = [
"openid",
"profile",
"email",
"phone",
"launch/patient",
"system/*.*",
"patient/*.read"
]
token_endpoint_auth_methods_supported = [
"client_secret_basic",
"client_secret_post"
]
claims_supported = [
"ver",
"jti",
"iss",
"aud",
"iat",
"exp",
"cid",
"uid",
"scp",
"sub"
]
code_challenge_methods_supported = ["S256"]
registration_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/register"
management_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/userInfo"
introspection_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/introspect"
revocation_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/revoke"
revocation_endpoint_auth_methods_supported = [
"client_secret_basic",
"client_secret_post",
"client_secret_jwt",
"private_key_jwt",
"none"
]
request_parameter_supported = true
request_object_signing_alg_values_supported = [
"HS256",
"HS384",
"HS512",
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512"
]
capabilities = [
"launch-ehr",
"sso-openid-connect",
"client-public"
]
})
}
sse_configuration = {
kms_encryption_config = {
cmk_type = "AWS_OWNED_KMS_KEY"
kms_key_id = null
}
}
lifecycle {
ignore_changes = [identity_provider_configuration]
}
}
## Cognito ##
resource "aws_cognito_user_pool" "main" {
name = "my-cognito-user-pool"
deletion_protection = "INACTIVE" # or "ACTIVE"
mfa_configuration = "OFF" # or "ON" / "OPTIONAL"
alias_attributes = ["email"]
auto_verified_attributes = ["email"]
account_recovery_setting {
recovery_mechanism {
name = "verified_email"
priority = 1
}
}
admin_create_user_config {
allow_admin_create_user_only = true
}
email_configuration {
email_sending_account = "COGNITO_DEFAULT"
}
password_policy {
minimum_length = 8
require_lowercase = true
require_numbers = true
require_symbols = true
require_uppercase = true
temporary_password_validity_days = 7
}
schema {
attribute_data_type = "String"
developer_only_attribute = false
mutable = true
name = "email"
required = true
string_attribute_constraints {
max_length = "2048"
min_length = "0"
}
}
user_attribute_update_settings {
attributes_require_verification_before_update = ["email"]
}
username_configuration {
case_sensitive = false
}
verification_message_template {
default_email_option = "CONFIRM_WITH_CODE"
}
}
resource "aws_cognito_user_pool_domain" "this" {
domain = "smart-fhir-test"
user_pool_id = aws_cognito_user_pool.main.id
}
resource "aws_cognito_resource_server" "launch" {
identifier = "launch"
name = "launch"
user_pool_id = aws_cognito_user_pool.main.id
scope {
scope_name = "patient"
scope_description = "Request patient data"
}
}
resource "aws_cognito_resource_server" "system" {
identifier = "system"
name = "system"
user_pool_id = aws_cognito_user_pool.main.id
scope {
scope_name = "*.*"
scope_description = "Full system access"
}
}
resource "aws_cognito_resource_server" "patient" {
identifier = "patient"
name = "patient"
user_pool_id = aws_cognito_user_pool.main.id
scope {
scope_name = "*.read"
scope_description = "Read patient data"
}
}
resource "aws_cognito_user_pool_client" "client" {
name = "my-cognito-client"
user_pool_id = aws_cognito_user_pool.main.id
generate_secret = true
prevent_user_existence_errors = "ENABLED"
allowed_oauth_flows = ["client_credentials"] # ["code"] for auth code
allowed_oauth_flows_user_pool_client = true
# scopes for client credentials flow
allowed_oauth_scopes = [
"launch/patient",
"system/*.*",
"patient/*.read"
]
# scopes for auth code flow
# allowed_oauth_scopes = [
# "openid",
# "profile",
# "email",
# "phone",
# "launch/patient",
# "system/*.*",
# "patient/*.read"
# ]
callback_urls = ["https://localhost"]
logout_urls = []
supported_identity_providers = ["COGNITO"]
# token validity
access_token_validity = 60 # 1 hour
id_token_validity = 60 # 1 hour
refresh_token_validity = 60 # 1 hour
read_attributes = [
"address", "birthdate", "email", "email_verified", "family_name",
"gender", "given_name", "locale", "middle_name", "name", "nickname",
"phone_number", "phone_number_verified", "picture", "preferred_username",
"profile", "updated_at", "website", "zoneinfo"
]
write_attributes = [
"address", "birthdate", "email", "family_name", "gender", "given_name",
"locale", "middle_name", "name", "nickname", "phone_number", "picture",
"preferred_username", "profile", "updated_at", "website", "zoneinfo"
]
token_validity_units {
access_token = "minutes"
id_token = "minutes"
refresh_token = "minutes"
}
depends_on = [
aws_cognito_resource_server.launch,
aws_cognito_resource_server.system,
aws_cognito_resource_server.patient
]
}
resource "aws_cognito_user" "test_user" {
user_pool_id = aws_cognito_user_pool.main.id
username = "test"
password = "Password1234!"
attributes = {
preferred_username = "testadmin"
email = "test@test.com"
email_verified = true
}
}
## Lambda ##
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}
data "archive_file" "lambda_zip" {
type = "zip"
source_dir = "lambda/package"
output_path = "lambda/lambda_function.zip"
excludes = ["__pycache__", "*.pyc", "*.dist-info"]
}
resource "aws_lambda_function" "token_validator" {
filename = data.archive_file.lambda_zip.output_path
function_name = "smart-fhir-auth-lambda"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
source_code_hash = filebase64sha256(data.archive_file.lambda_zip.output_path)
runtime = "python3.11"
timeout = 60
memory_size = 512
architectures = ["x86_64"]
environment {
variables = {
CLIENT_ID = aws_cognito_user_pool_client.client.id
CLIENT_SECRET = aws_cognito_user_pool_client.client.client_secret
JWKS_URI = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"
USER_ROLE_ARN = aws_iam_role.healthlake_service_role.arn
USER_POOL_ID = aws_cognito_user_pool.main.id
}
}
}
resource "aws_iam_role" "healthlake_service_role" {
name = "my-healthlake-service-role"
description = "Service role for AWS HealthLake FHIR operations"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "healthlake.amazonaws.com"
}
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_role_policy_attachment" "healthlake_policy" {
role = aws_iam_role.healthlake_service_role.name
policy_arn = "arn:aws:iam::aws:policy/AmazonHealthLakeFullAccess"
}
resource "aws_iam_role" "lambda_role" {
name = "smart-fhir-auth-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic" {
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
role = aws_iam_role.lambda_role.name
}
resource "aws_iam_role_policy" "cognito_access" {
name = "smart-fhir-auth-lambda-cognito-access"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"cognito-idp:GetUser"
]
Resource = "*"
}
]
})
}
resource "aws_lambda_permission" "healthlake" {
statement_id = "healthlake"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.token_validator.function_name
principal = "healthlake.amazonaws.com"
}
output "cognito_scopes" {
description = "Cognito scopes"
value = {
launch = aws_cognito_resource_server.launch.id
system = aws_cognito_resource_server.system.id
patient = aws_cognito_resource_server.patient.id
}
}
output "cognito_oauth_endpoints" {
description = "OAuth endpoints for Cognito"
value = {
authorization = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/authorize"
token = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/token"
userinfo = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/userInfo"
jwks = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"
}
}
output "cognito_domain" {
description = "Cognito domain"
value = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com"
}
output "cognito_client_credentials" {
description = "Cognito Client credentials"
value = {
client_id = aws_cognito_user_pool_client.client.id
client_secret = aws_cognito_user_pool_client.client.client_secret
}
sensitive = true
}
output "datastore_endpoint" {
value = awscc_healthlake_fhir_datastore.this.datastore_endpoint
}
output "datastore_arn" {
value = awscc_healthlake_fhir_datastore.this.datastore_arn
}
# healthlake/lambda/package/lambda_function.py
import base64
import logging
import json
import os
import urllib.request
from typing import Dict, Any
from datetime import datetime
import time
from jose import jwt
from jose.exceptions import JWTError
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Get environment variables
CLIENT_ID = os.environ['CLIENT_ID']
CLIENT_SECRET = os.environ['CLIENT_SECRET']
JWKS_URI = os.environ['JWKS_URI']
USER_ROLE_ARN = os.environ['USER_ROLE_ARN']
USER_POOL_ID = os.environ['USER_POOL_ID']
class TokenValidationError(Exception):
"""Custom exception for token validation errors"""
pass
def validate_token_claims(decoded_token: Dict[str, Any], datastore_endpoint: str) -> Dict[str, Any]:
"""
Validate and format the required claims according to HealthLake's expected format:
{
"iss": "authorization-server-endpoint",
"aud": "healthlake-datastore-endpoint",
"iat": timestamp,
"nbf": timestamp,
"exp": timestamp,
"isAuthorized": "true",
"uid": "user-identifier",
"scope": "system/*.*"
}
"""
current_time = int(time.time())
# Extract base claims
mapped_token = {
"iss": decoded_token.get('iss'),
"aud": datastore_endpoint, # Set to HealthLake datastore endpoint
"iat": decoded_token.get('iat', current_time),
"nbf": decoded_token.get('iat', current_time), # Use iat if nbf not present
"exp": decoded_token.get('exp'),
"isAuthorized": "true", # String "true" as per example
"uid": decoded_token.get('sub', decoded_token.get('username', '')), # Use sub or username as uid
"scope": decoded_token.get('scope', '')
}
# Validate required claims
required_claims = ['aud', 'nbf', 'exp', 'scope']
missing_claims = [claim for claim in required_claims if not mapped_token.get(claim)]
if missing_claims:
raise TokenValidationError(f"Missing required claims: {', '.join(missing_claims)}")
# Validate timestamps
if current_time > mapped_token['exp']:
raise TokenValidationError("Token has expired")
if current_time < mapped_token['nbf']:
raise TokenValidationError("Token is not yet valid")
# Validate scope format and presence
scopes = mapped_token['scope'].split()
if not scopes:
raise TokenValidationError("Token has empty scope")
# Validate at least one FHIR resource scope exists
valid_scope_prefixes = ('user/', 'system/', 'patient/', 'launch/')
has_fhir_scope = any(
scope.startswith(valid_scope_prefixes)
for scope in scopes
)
if not has_fhir_scope:
raise TokenValidationError("Token missing required FHIR resource scope")
logger.info(f"Final mapped token: {json.dumps(mapped_token, default=str)}")
return mapped_token
def decode_token(token: str) -> Dict[str, Any]:
"""Decode and validate the JWT token"""
try:
headers = jwt.get_unverified_headers(token)
kid = headers.get('kid')
if not kid:
raise TokenValidationError("No 'kid' found in token headers")
jwks = fetch_jwks()
public_key = get_public_key(kid, jwks)
decoded = jwt.decode(
token,
public_key,
algorithms=['RS256'],
options={
'verify_exp': True,
'verify_aud': False # We handle audience validation separately
}
)
logger.info(f"Token decoded successfully: {json.dumps(decoded, default=str)}")
return decoded
except JWTError as e:
logger.error(f"JWT validation error: {str(e)}")
raise TokenValidationError(f"Token validation failed: {str(e)}")
except Exception as e:
logger.error(f"Token decoding error: {str(e)}")
raise TokenValidationError(f"Token decoding failed: {str(e)}")
def fetch_jwks() -> Dict[str, Any]:
"""Fetch the JWKS from the authorization server"""
try:
with urllib.request.urlopen(JWKS_URI) as response:
return json.loads(response.read().decode('utf-8'))
except Exception as e:
logger.error(f"Error fetching JWKS: {str(e)}")
raise TokenValidationError(f"Failed to fetch JWKS: {str(e)}")
def get_public_key(kid: str, jwks: Dict[str, Any]) -> str:
"""Get the public key matching the key ID from JWKS"""
for key in jwks.get('keys', []):
if key.get('kid') == kid:
return json.dumps(key)
raise TokenValidationError(f"No matching key found for kid: {kid}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Lambda handler for SMART on FHIR token validation
Expected output format:
{
"authPayload": {
"iss": "https://authorization-server-endpoint/oauth2/token",
"aud": "https://healthlake.region.amazonaws.com/datastore/id/r4/",
"iat": 1677115637,
"nbf": 1677115637,
"exp": 1997877061,
"isAuthorized": "true",
"uid": "100101",
"scope": "system/*.*"
},
"iamRoleARN": "iam-role-arn"
}
"""
try:
# Validate input
required_fields = ['datastoreEndpoint', 'operationName', 'bearerToken']
if not all(field in event for field in required_fields):
raise ValueError(f"Missing required fields: {', '.join(required_fields)}")
logger.info(f"Processing request for endpoint: {event['datastoreEndpoint']}, "
f"operation: {event['operationName']}")
# Extract token from bearer string
bearer_token = event['bearerToken']
token = bearer_token[7:] if bearer_token.startswith('Bearer ') else bearer_token
# Decode and validate token
decoded_token = decode_token(token)
# Format claims to match expected output
auth_payload = validate_token_claims(decoded_token, event['datastoreEndpoint'])
return {
'authPayload': auth_payload,
'iamRoleARN': USER_ROLE_ARN
}
except TokenValidationError as e:
logger.error(f"Token validation error: {str(e)}")
return {
'authPayload': {
'isAuthorized': "false",
'error': str(e)
}
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
'authPayload': {
'isAuthorized': "false",
'error': f"Internal error: {str(e)}"
}
}
# healthlake/lambda/requirements.txt
PyJWT==2.8.0
python-jose[cryptography]==3.3.0
urllib3>=2.0.7
# installing dependencies
python3.11 -m venv venv
source venv/bin/activate
pip3 install \
--platform manylinux2014_x86_64 \
--target=package \
--implementation cp \
--python-version 3.11 \
--only-binary=:all: --upgrade -r requirements.txt