Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#
# This is an example of an AWS Lambda function implementing a Cognito CUSTOM_AUTH flow.
# The custom auth flow accepts a JWT token a sole credentials and returns AUTHENTICATED when
# the JWT token is valid
#
import os
from utils import jwt_apple
def define_auth_challenge(event):
print('Define Auth Challenge')
session_length = len(event['request']['session'])
# last step, custom challenge received a correct answer
if session_length > 0 \
and event['request']['session'][-1]['challengeName'] == 'CUSTOM_CHALLENGE' \
and event['request']['session'][-1]['challengeResult']:
# The user provided the right answer; succeed auth
event['response']['challengeName'] = 'CUSTOM_CHALLENGE'
event['response']['issueTokens'] = True
event['response']['failAuthentication'] = False
# when using Amplify, first step is SRP_A, we bypass it and switch to custom challenge
elif session_length == 1 \
and event['request']['session'][-1]['challengeName'] == 'SRP_A':
event['response']['issueTokens'] = False
event['response']['failAuthentication'] = False
event['response']['challengeName'] = 'CUSTOM_CHALLENGE'
# when using AWS CLI, session is empty
elif session_length == 0:
event['response']['issueTokens'] = False
event['response']['failAuthentication'] = False
event['response']['challengeName'] = 'CUSTOM_CHALLENGE'
else:
# fail authentication in all other cases
event['response']['issueTokens'] = False
event['response']['failAuthentication'] = True
return event
def create_auth_challenge(event):
print('Create Auth Challenge')
event['response']['publicChallengeParameters'] = {
'challenge' : 'present a valid JWT token issued by a recognized provider',
'providers' : 'Apple'
}
event['response']['privateChallengeParameters'] = {}
event['response']['challengeMetadata'] = "IDP_TOKEN"
return event
def verify_auth_challenge_response(event):
print('Verify Auth Challenge Response')
# verify JWT Token received
# https://sarunw.com/posts/sign-in-with-apple-3/
idp_token = event['request']['challengeAnswer']
print(f'IDTOKEN to verify : {idp_token[0:10]}')
# I expect to receive a token in the form
# PROVIDER_NAME:::TOKEN
TOKEN_SEPARATOR=':::'
if idp_token.find(TOKEN_SEPARATOR) == -1:
print(f'There is no token separtor in the string received. ' + \
'Token must be in the form <provider name>:::<base 64 encoded token>')
event['response']['answerCorrect'] = False
else:
(provider, _ , token) = idp_token.partition(TOKEN_SEPARATOR)
# we only accept apple tokens
if (provider.lower() != 'apple'):
print(f'Invalid token provider : {provider}')
event['response']['answerCorrect'] = False
else:
claim = None
# For testing only
if 'testing_key' in event['request']:
key = event['request']['testing_key']
claim = jwt_apple.decode_apple_user_token(token,key)
else:
claim = jwt_apple.decode_apple_user_token(token)
print(claim)
event['response']['answerCorrect'] = True
return event
def pre_signup(event):
print('PreSignUp')
event['response']['autoConfirmUser'] = True
event['response']['autoVerifyEmail'] = True
return event
# https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-lambda-pre-authentication.html
def lambda_handler(event, _):
print('RECEIVED')
print(event)
COGNITO_CLIENT_ID = os.environ['COGNITO_CLIENT_ID']
client_id = event['callerContext']['clientId']
if client_id not in (COGNITO_CLIENT_ID, 'CLIENT_ID_NOT_APPLICABLE'):
raise Exception(f'Cannot authenticate users from this user pool app client: {client_id}')
# when user does not exist, reject the request
if 'userNotFound' in event['request'] and event['request']['userNotFound']:
# the [USER_NOT_FOUND] tag is important to let client know the error,
# I could not find a way to capture a specific exception in the client
raise Exception(f"User {event['userName']} does not exist in pool {event['userPoolId']}, " +
f"please signup first. [USER_NOT_FOUND]")
result = event
if event['triggerSource'] == 'DefineAuthChallenge_Authentication':
result = define_auth_challenge(event)
elif event['triggerSource'] == 'CreateAuthChallenge_Authentication':
result = create_auth_challenge(event)
elif event['triggerSource'] == 'VerifyAuthChallengeResponse_Authentication':
result = verify_auth_challenge_response(event)
elif event['triggerSource'] == 'PreSignUp_SignUp':
result = pre_signup(event)
else:
print(f"WARNING - Cognito Event {event['triggerSource']} not handled")
# force an error on Cognito
del result['response']
print('RESPONSE')
print(result)
return result
@sebsto

This comment has been minimized.

Copy link
Owner Author

@sebsto sebsto commented Feb 15, 2021

On the client side, from an iOS application

// authentication succeed
    func authorizationController(controller: ASAuthorizationController, didCompleteWithAuthorization authorization: ASAuthorization) {
        
        
        guard  let appleIDCredential = authorization.credential as? ASAuthorizationAppleIDCredential else {
            self.logger.error("credentials not found....")
            return
        }
        
        guard let appleIDToken = appleIDCredential.identityToken else {
            self.logger.error("Unable to fetch identity token")
            return
        }
        
        guard let idTokenString = String(data: appleIDToken, encoding: .utf8) else {
            self.logger.error("Unable to serialize token string from data: \(appleIDToken.debugDescription)")
            return
        }
                
        // store user details UserData (which in turn store them in the keychain)
        let userData : UserData = .shared
        userData.familyName = appleIDCredential.fullName?.familyName ?? ""
        userData.firstName = appleIDCredential.fullName?.givenName ?? ""
        userData.email = appleIDCredential.email ?? "noemail@email.com"
        userData.userId = appleIDCredential.user

        Backend.shared.signIn(username: userData.userId , token: "Apple:::" + idTokenString) { result in
            switch result {
            case .success:
                self.logger.info("SignIn Completed - Success")
            case .failed:
                self.logger.error("SignIn Failed")
            case .userNotFound:
                self.logger.info("User does not exist, starting signup sequence")
                Backend.shared.signUp(username: userData.userId, email: userData.email) { result in
                    switch result {
                    case .success:
                        self.logger.info("Signup successful, now signing in")
                        Backend.shared.signIn(username: userData.userId, token: "Apple:::" + idTokenString) { result in
                            switch result {
                            case .success:
                                self.logger.info("SignIn Completed - Success")
                            case .failed:
                                self.logger.error("SignIn Failed")
                            case .userNotFound:
                                fatalError("It is not possible to not find the user; we just finished signUp")
                            }
                        }
                    case .failed:
                        self.logger.error("SignUp failed, it's probably a bug, inform user")
                        // TODO inform user
                    }
                }
            }
        }
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment