Skip to content

Instantly share code, notes, and snippets.

@oneamitj
Created August 10, 2022 16:29
Show Gist options
  • Save oneamitj/c97e166f74c44b37dfdbe39419ffdb0d to your computer and use it in GitHub Desktop.
Save oneamitj/c97e166f74c44b37dfdbe39419ffdb0d to your computer and use it in GitHub Desktop.
Lambda Function to use as authentication provider with user management from Secret Manager for Transfer Family.
# This code is part of Transfer Family Tutorial: https://aws.amazon.com/blogs/storage/enable-password-authentication-for-aws-transfer-family-using-aws-secrets-manager-updated/
# Full zip with CloudFormation code mentioned on that tutorial: https://s3.amazonaws.com/aws-transfer-resources/custom-idp-templates/aws-transfer-custom-idp-secrets-manager-sourceip-protocol-support-apig.zip
# My step wise spinning up SFTP in Transfer Family with username and password:
import os
import json
import boto3
import base64
from ipaddress import ip_network, ip_address
from botocore.exceptions import ClientError
def lambda_handler(event, context):
# Get the required parameters
required_param_list = ["serverId", "username", "protocol", "sourceIp"]
for parameter in required_param_list:
if parameter not in event:
print("Incoming " + parameter + " missing - Unexpected")
return {}
input_serverId = event["serverId"]
input_username = event["username"]
input_protocol = event["protocol"]
input_sourceIp = event["sourceIp"]
input_password = event.get("password", "")
print("ServerId: {}, Username: {}, Protocol: {}, SourceIp: {}"
.format(input_serverId, input_username, input_protocol, input_sourceIp))
# Check for password and set authentication type appropriately. No password means SSH auth
print("Start User Authentication Flow")
if input_password != "":
print("Using PASSWORD authentication")
authentication_type = "PASSWORD"
else:
if input_protocol == 'FTP' or input_protocol == 'FTPS':
print("Empty password not allowed for FTP/S")
return {}
print("Using SSH authentication")
authentication_type = "SSH"
# Retrieve our user details from the secret. For all key-value pairs stored in SecretManager,
# checking the protocol-specified secret first, then use generic ones.
# e.g. If SFTPPassword and Password both exists, will be using SFTPPassword for authentication
secret = get_secret(input_serverId + "/" + input_username)
if secret is not None:
secret_dict = json.loads(secret)
# Run our password checks
user_authenticated = authenticate_user(authentication_type, secret_dict, input_password, input_protocol)
# Run sourceIp checks
ip_match = check_ipaddress(secret_dict, input_sourceIp, input_protocol)
if user_authenticated and ip_match:
print("User authenticated, calling build_response with: " + authentication_type)
return build_response(secret_dict, authentication_type, input_protocol)
else:
print("User failed authentication return empty response")
return {}
else:
# Otherwise something went wrong. Most likely the object name is not there
print("Secrets Manager exception thrown - Returning empty response")
# Return an empty data response meaning the user was not authenticated
return {}
def lookup(secret_dict, key, input_protocol):
if input_protocol + key in secret_dict:
print("Found protocol-specified {}".format(key))
return secret_dict[input_protocol + key]
else:
return secret_dict.get(key, None)
def check_ipaddress(secret_dict, input_sourceIp, input_protocol):
accepted_ip_network = lookup(secret_dict, "AcceptedIpNetwork", input_protocol)
if not accepted_ip_network:
# No IP provided so skip checks
print("No IP range provided - Skip IP check")
return True
net = ip_network(accepted_ip_network)
if ip_address(input_sourceIp) in net:
print("Source IP address match")
return True
else:
print("Source IP address not in range")
return False
def authenticate_user(auth_type, secret_dict, input_password, input_protocol):
# Function returns True if: auth_type is password and passwords match or auth_type is SSH. Otherwise returns False
if auth_type == "SSH":
# Place for additional checks in future
print("Skip password check as SSH login request")
return True
# auth_type could only be SSH or PASSWORD
else:
# Retrieve the password from the secret if exists
password = lookup(secret_dict, "Password", input_protocol)
if not password:
print("Unable to authenticate user - No field match in Secret for password")
return False
if input_password == password:
return True
else:
print("Unable to authenticate user - Incoming password does not match stored")
return False
# Build out our response data for an authenticated response
def build_response(secret_dict, auth_type, input_protocol):
response_data = {}
# Check for each key value pair. These are required so set to empty string if missing
role = lookup(secret_dict, "Role", input_protocol)
if role:
response_data["Role"] = role
else:
print("No field match for role - Set empty string in response")
response_data["Role"] = ""
# These are optional so ignore if not present
policy = lookup(secret_dict, "Policy", input_protocol)
if policy:
response_data["Policy"] = policy
# External Auth providers support chroot and virtual folder assignments so we'll check for that
home_directory_details = lookup(secret_dict, "HomeDirectoryDetails", input_protocol)
if home_directory_details:
print("HomeDirectoryDetails found - Applying setting for virtual folders - "
"Note: Cannot be used in conjunction with key: HomeDirectory")
response_data["HomeDirectoryDetails"] = home_directory_details
# If we have a virtual folder setup then we also need to set HomeDirectoryType to "Logical"
print("Setting HomeDirectoryType to LOGICAL")
response_data["HomeDirectoryType"] = "LOGICAL"
# Note that HomeDirectory and HomeDirectoryDetails / Logical mode
# can't be used together but we're not checking for this
home_directory = lookup(secret_dict, "HomeDirectory", input_protocol)
if home_directory:
print("HomeDirectory found - Note: Cannot be used in conjunction with key: HomeDirectoryDetails")
response_data["HomeDirectory"] = home_directory
if auth_type == "SSH":
public_key = lookup(secret_dict, "PublicKey", input_protocol)
if public_key:
response_data["PublicKeys"] = [public_key]
else:
# SSH Auth Flow - We don't have keys so we can't help
print("Unable to authenticate user - No public keys found")
return {}
return response_data
def get_secret(id):
region = os.environ["SecretsManagerRegion"]
print("Secrets Manager Region: " + region)
print("Secret Name: " + id)
# Create a Secrets Manager client
client = boto3.session.Session().client(service_name="secretsmanager", region_name=region)
try:
resp = client.get_secret_value(SecretId=id)
# Decrypts secret using the associated KMS CMK.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if "SecretString" in resp:
print("Found Secret String")
return resp["SecretString"]
else:
print("Found Binary Secret")
return base64.b64decode(resp["SecretBinary"])
except ClientError as err:
print("Error Talking to SecretsManager: " + err.response["Error"]["Code"] + ", Message: " +
err.response["Error"]["Message"])
return None
@karageee
Copy link

running it non public will cause error on the parameter's check, specifically line 19 due to being unable to use the endpoint

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment