Skip to content

Instantly share code, notes, and snippets.

@kczpl
Created December 18, 2024 14:43
Show Gist options
  • Save kczpl/438343552c1093d6d2d1f18de5182e26 to your computer and use it in GitHub Desktop.
Save kczpl/438343552c1093d6d2d1f18de5182e26 to your computer and use it in GitHub Desktop.
AWS HealthLake SMART-on-FHIR with Terraform

HealthLake

Terraform

# healthlake/main.tf

## HealthLake ##
resource "awscc_healthlake_fhir_datastore" "this" {
  datastore_name         = "my-fhir-datastore"
  datastore_type_version = "R4"
  preload_data_config = {
    preload_data_type = "SYNTHEA"
  }

  identity_provider_configuration = {
    authorization_strategy             = "SMART_ON_FHIR_V1"
    fine_grained_authorization_enabled = true
    idp_lambda_arn                     = aws_lambda_function.token_validator.arn
    metadata = jsonencode({
      issuer                 = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com"
      authorization_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/authorize"
      token_endpoint         = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/token"
      jwks_uri               = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"

      response_types_supported = ["code", "token"]
      response_modes_supported = ["query", "fragment", "form_post"]

      grant_types_supported = [
        "authorization_code",
        "implicit",
        "refresh_token",
        "password",
        "client_credentials"
      ]

      subject_types_supported = ["public"]

      scopes_supported = [
        "openid",
        "profile",
        "email",
        "phone",
        "launch/patient",
        "system/*.*",
        "patient/*.read"
      ]

      token_endpoint_auth_methods_supported = [
        "client_secret_basic",
        "client_secret_post"
      ]

      claims_supported = [
        "ver",
        "jti",
        "iss",
        "aud",
        "iat",
        "exp",
        "cid",
        "uid",
        "scp",
        "sub"
      ]

      code_challenge_methods_supported = ["S256"]

      registration_endpoint  = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/register"
      management_endpoint    = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/userInfo"
      introspection_endpoint = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/introspect"
      revocation_endpoint    = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/revoke"

      revocation_endpoint_auth_methods_supported = [
        "client_secret_basic",
        "client_secret_post",
        "client_secret_jwt",
        "private_key_jwt",
        "none"
      ]

      request_parameter_supported = true

      request_object_signing_alg_values_supported = [
        "HS256",
        "HS384",
        "HS512",
        "RS256",
        "RS384",
        "RS512",
        "ES256",
        "ES384",
        "ES512"
      ]

      capabilities = [
        "launch-ehr",
        "sso-openid-connect",
        "client-public"
      ]
    })
  }

  sse_configuration = {
    kms_encryption_config = {
      cmk_type   = "AWS_OWNED_KMS_KEY"
      kms_key_id = null
    }
  }

  lifecycle {
    ignore_changes = [identity_provider_configuration]
  }
}

## Cognito ##
resource "aws_cognito_user_pool" "main" {
  name                = "my-cognito-user-pool"
  deletion_protection = "INACTIVE" # or "ACTIVE"
  mfa_configuration   = "OFF"      # or "ON" / "OPTIONAL"

  alias_attributes         = ["email"]
  auto_verified_attributes = ["email"]

  account_recovery_setting {
    recovery_mechanism {
      name     = "verified_email"
      priority = 1
    }
  }

  admin_create_user_config {
    allow_admin_create_user_only = true
  }

  email_configuration {
    email_sending_account = "COGNITO_DEFAULT"
  }

  password_policy {
    minimum_length                   = 8
    require_lowercase                = true
    require_numbers                  = true
    require_symbols                  = true
    require_uppercase                = true
    temporary_password_validity_days = 7
  }

  schema {
    attribute_data_type      = "String"
    developer_only_attribute = false
    mutable                  = true
    name                     = "email"
    required                 = true

    string_attribute_constraints {
      max_length = "2048"
      min_length = "0"
    }
  }

  user_attribute_update_settings {
    attributes_require_verification_before_update = ["email"]
  }

  username_configuration {
    case_sensitive = false
  }

  verification_message_template {
    default_email_option = "CONFIRM_WITH_CODE"
  }
}

resource "aws_cognito_user_pool_domain" "this" {
  domain       = "smart-fhir-test"
  user_pool_id = aws_cognito_user_pool.main.id
}

resource "aws_cognito_resource_server" "launch" {
  identifier   = "launch"
  name         = "launch"
  user_pool_id = aws_cognito_user_pool.main.id

  scope {
    scope_name        = "patient"
    scope_description = "Request patient data"
  }
}

resource "aws_cognito_resource_server" "system" {
  identifier   = "system"
  name         = "system"
  user_pool_id = aws_cognito_user_pool.main.id

  scope {
    scope_name        = "*.*"
    scope_description = "Full system access"
  }
}

resource "aws_cognito_resource_server" "patient" {
  identifier   = "patient"
  name         = "patient"
  user_pool_id = aws_cognito_user_pool.main.id

  scope {
    scope_name        = "*.read"
    scope_description = "Read patient data"
  }
}

resource "aws_cognito_user_pool_client" "client" {
  name         = "my-cognito-client"
  user_pool_id = aws_cognito_user_pool.main.id

  generate_secret = true

  prevent_user_existence_errors        = "ENABLED"
  allowed_oauth_flows                  = ["client_credentials"] # ["code"] for auth code
  allowed_oauth_flows_user_pool_client = true

  # scopes for client credentials flow
  allowed_oauth_scopes = [
    "launch/patient",
    "system/*.*",
    "patient/*.read"
  ]

  # scopes for auth code flow
  # allowed_oauth_scopes = [
  #   "openid",
  #   "profile",
  #   "email",
  #   "phone",
  #   "launch/patient",
  #   "system/*.*",
  #   "patient/*.read"
  # ]

  callback_urls = ["https://localhost"]
  logout_urls   = []

  supported_identity_providers = ["COGNITO"]

  # token validity
  access_token_validity  = 60 # 1 hour
  id_token_validity      = 60 # 1 hour
  refresh_token_validity = 60 # 1 hour

  read_attributes = [
    "address", "birthdate", "email", "email_verified", "family_name",
    "gender", "given_name", "locale", "middle_name", "name", "nickname",
    "phone_number", "phone_number_verified", "picture", "preferred_username",
    "profile", "updated_at", "website", "zoneinfo"
  ]

  write_attributes = [
    "address", "birthdate", "email", "family_name", "gender", "given_name",
    "locale", "middle_name", "name", "nickname", "phone_number", "picture",
    "preferred_username", "profile", "updated_at", "website", "zoneinfo"
  ]

  token_validity_units {
    access_token  = "minutes"
    id_token      = "minutes"
    refresh_token = "minutes"
  }

  depends_on = [
    aws_cognito_resource_server.launch,
    aws_cognito_resource_server.system,
    aws_cognito_resource_server.patient
  ]
}

resource "aws_cognito_user" "test_user" {
  user_pool_id = aws_cognito_user_pool.main.id
  username     = "test"
  password     = "Password1234!"

  attributes = {
    preferred_username = "testadmin"
    email              = "test@test.com"
    email_verified     = true
  }
}


## Lambda ##
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}

data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "lambda/package"
  output_path = "lambda/lambda_function.zip"
  excludes    = ["__pycache__", "*.pyc", "*.dist-info"]
}

resource "aws_lambda_function" "token_validator" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = "smart-fhir-auth-lambda"
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  source_code_hash = filebase64sha256(data.archive_file.lambda_zip.output_path)
  runtime          = "python3.11"
  timeout          = 60
  memory_size      = 512
  architectures    = ["x86_64"]

  environment {
    variables = {
      CLIENT_ID     = aws_cognito_user_pool_client.client.id
      CLIENT_SECRET = aws_cognito_user_pool_client.client.client_secret
      JWKS_URI      = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"
      USER_ROLE_ARN = aws_iam_role.healthlake_service_role.arn
      USER_POOL_ID  = aws_cognito_user_pool.main.id
    }
  }
}

resource "aws_iam_role" "healthlake_service_role" {
  name        = "my-healthlake-service-role"
  description = "Service role for AWS HealthLake FHIR operations"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "healthlake.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "healthlake_policy" {
  role       = aws_iam_role.healthlake_service_role.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonHealthLakeFullAccess"
}

resource "aws_iam_role" "lambda_role" {
  name = "smart-fhir-auth-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
  role       = aws_iam_role.lambda_role.name
}

resource "aws_iam_role_policy" "cognito_access" {
  name = "smart-fhir-auth-lambda-cognito-access"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "cognito-idp:GetUser"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_lambda_permission" "healthlake" {
  statement_id  = "healthlake"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.token_validator.function_name
  principal     = "healthlake.amazonaws.com"
}

output "cognito_scopes" {
  description = "Cognito scopes"
  value = {
    launch  = aws_cognito_resource_server.launch.id
    system  = aws_cognito_resource_server.system.id
    patient = aws_cognito_resource_server.patient.id
  }
}

output "cognito_oauth_endpoints" {
  description = "OAuth endpoints for Cognito"
  value = {
    authorization = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/authorize"
    token         = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/token"
    userinfo      = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com/oauth2/userInfo"
    jwks          = "https://cognito-idp.${data.aws_region.current.name}.amazonaws.com/${aws_cognito_user_pool.main.id}/.well-known/jwks.json"
  }
}

output "cognito_domain" {
  description = "Cognito domain"
  value       = "https://${aws_cognito_user_pool_domain.this.domain}.auth.${data.aws_region.current.name}.amazoncognito.com"
}

output "cognito_client_credentials" {
  description = "Cognito Client credentials"
  value = {
    client_id     = aws_cognito_user_pool_client.client.id
    client_secret = aws_cognito_user_pool_client.client.client_secret
  }
  sensitive = true
}
output "datastore_endpoint" {
  value = awscc_healthlake_fhir_datastore.this.datastore_endpoint
}

output "datastore_arn" {
  value = awscc_healthlake_fhir_datastore.this.datastore_arn
}

Lambda

#  healthlake/lambda/package/lambda_function.py
import base64
import logging
import json
import os
import urllib.request
from typing import Dict, Any
from datetime import datetime
import time
from jose import jwt
from jose.exceptions import JWTError

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Get environment variables
CLIENT_ID = os.environ['CLIENT_ID']
CLIENT_SECRET = os.environ['CLIENT_SECRET']
JWKS_URI = os.environ['JWKS_URI']
USER_ROLE_ARN = os.environ['USER_ROLE_ARN']
USER_POOL_ID = os.environ['USER_POOL_ID']

class TokenValidationError(Exception):
    """Custom exception for token validation errors"""
    pass

def validate_token_claims(decoded_token: Dict[str, Any], datastore_endpoint: str) -> Dict[str, Any]:
    """
    Validate and format the required claims according to HealthLake's expected format:
    {
        "iss": "authorization-server-endpoint",
        "aud": "healthlake-datastore-endpoint",
        "iat": timestamp,
        "nbf": timestamp,
        "exp": timestamp,
        "isAuthorized": "true",
        "uid": "user-identifier",
        "scope": "system/*.*"
    }
    """
    current_time = int(time.time())

    # Extract base claims
    mapped_token = {
        "iss": decoded_token.get('iss'),
        "aud": datastore_endpoint,  # Set to HealthLake datastore endpoint
        "iat": decoded_token.get('iat', current_time),
        "nbf": decoded_token.get('iat', current_time),  # Use iat if nbf not present
        "exp": decoded_token.get('exp'),
        "isAuthorized": "true",  # String "true" as per example
        "uid": decoded_token.get('sub', decoded_token.get('username', '')),  # Use sub or username as uid
        "scope": decoded_token.get('scope', '')
    }

    # Validate required claims
    required_claims = ['aud', 'nbf', 'exp', 'scope']
    missing_claims = [claim for claim in required_claims if not mapped_token.get(claim)]
    if missing_claims:
        raise TokenValidationError(f"Missing required claims: {', '.join(missing_claims)}")

    # Validate timestamps
    if current_time > mapped_token['exp']:
        raise TokenValidationError("Token has expired")
    if current_time < mapped_token['nbf']:
        raise TokenValidationError("Token is not yet valid")

    # Validate scope format and presence
    scopes = mapped_token['scope'].split()
    if not scopes:
        raise TokenValidationError("Token has empty scope")

    # Validate at least one FHIR resource scope exists
    valid_scope_prefixes = ('user/', 'system/', 'patient/', 'launch/')
    has_fhir_scope = any(
        scope.startswith(valid_scope_prefixes)
        for scope in scopes
    )
    if not has_fhir_scope:
        raise TokenValidationError("Token missing required FHIR resource scope")

    logger.info(f"Final mapped token: {json.dumps(mapped_token, default=str)}")
    return mapped_token

def decode_token(token: str) -> Dict[str, Any]:
    """Decode and validate the JWT token"""
    try:
        headers = jwt.get_unverified_headers(token)
        kid = headers.get('kid')
        if not kid:
            raise TokenValidationError("No 'kid' found in token headers")

        jwks = fetch_jwks()
        public_key = get_public_key(kid, jwks)

        decoded = jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            options={
                'verify_exp': True,
                'verify_aud': False  # We handle audience validation separately
            }
        )

        logger.info(f"Token decoded successfully: {json.dumps(decoded, default=str)}")
        return decoded

    except JWTError as e:
        logger.error(f"JWT validation error: {str(e)}")
        raise TokenValidationError(f"Token validation failed: {str(e)}")
    except Exception as e:
        logger.error(f"Token decoding error: {str(e)}")
        raise TokenValidationError(f"Token decoding failed: {str(e)}")

def fetch_jwks() -> Dict[str, Any]:
    """Fetch the JWKS from the authorization server"""
    try:
        with urllib.request.urlopen(JWKS_URI) as response:
            return json.loads(response.read().decode('utf-8'))
    except Exception as e:
        logger.error(f"Error fetching JWKS: {str(e)}")
        raise TokenValidationError(f"Failed to fetch JWKS: {str(e)}")

def get_public_key(kid: str, jwks: Dict[str, Any]) -> str:
    """Get the public key matching the key ID from JWKS"""
    for key in jwks.get('keys', []):
        if key.get('kid') == kid:
            return json.dumps(key)
    raise TokenValidationError(f"No matching key found for kid: {kid}")

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Lambda handler for SMART on FHIR token validation
    Expected output format:
    {
        "authPayload": {
            "iss": "https://authorization-server-endpoint/oauth2/token",
            "aud": "https://healthlake.region.amazonaws.com/datastore/id/r4/",
            "iat": 1677115637,
            "nbf": 1677115637,
            "exp": 1997877061,
            "isAuthorized": "true",
            "uid": "100101",
            "scope": "system/*.*"
        },
        "iamRoleARN": "iam-role-arn"
    }
    """
    try:
        # Validate input
        required_fields = ['datastoreEndpoint', 'operationName', 'bearerToken']
        if not all(field in event for field in required_fields):
            raise ValueError(f"Missing required fields: {', '.join(required_fields)}")

        logger.info(f"Processing request for endpoint: {event['datastoreEndpoint']}, "
                   f"operation: {event['operationName']}")

        # Extract token from bearer string
        bearer_token = event['bearerToken']
        token = bearer_token[7:] if bearer_token.startswith('Bearer ') else bearer_token

        # Decode and validate token
        decoded_token = decode_token(token)

        # Format claims to match expected output
        auth_payload = validate_token_claims(decoded_token, event['datastoreEndpoint'])

        return {
            'authPayload': auth_payload,
            'iamRoleARN': USER_ROLE_ARN
        }

    except TokenValidationError as e:
        logger.error(f"Token validation error: {str(e)}")
        return {
            'authPayload': {
                'isAuthorized': "false",
                'error': str(e)
            }
        }
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {
            'authPayload': {
                'isAuthorized': "false",
                'error': f"Internal error: {str(e)}"
            }
        }
# healthlake/lambda/requirements.txt
PyJWT==2.8.0
python-jose[cryptography]==3.3.0
urllib3>=2.0.7
# installing dependencies
python3.11 -m venv venv
source venv/bin/activate

pip3 install \
--platform manylinux2014_x86_64 \
--target=package \
--implementation cp \
--python-version 3.11 \
--only-binary=:all: --upgrade -r requirements.txt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment