Skip to content

Instantly share code, notes, and snippets.

@fcavalcantirj
Last active August 25, 2018 17:08
Show Gist options
  • Save fcavalcantirj/f7ff842706d64eb6fdb40bf02fa6f44b to your computer and use it in GitHub Desktop.
Save fcavalcantirj/f7ff842706d64eb6fdb40bf02fa6f44b to your computer and use it in GitHub Desktop.
Lambda trigger to refresh access token users using refreshToken and initiate_auth API....
"""This is an function to get a new accessToken based on the refreshToken.
Usage::
Just create an resource and a method on the API Gateway,
integrate the request with this lambda function via console, and test it;
json used to test;
{
"refreshToken":"something"
}
"""
from __future__ import print_function
import boto3
import botocore.exceptions
import hmac
import hashlib
import base64
import json
import uuid
import logging
USER_POOL_ID = 'us-east-2_blah-blah-blah'
CLIENT_ID = 'blah-blah-blah'
CLIENT_SECRET = 'blah-blah-blah-really-long-string'
client = None
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_secret_hash(username):
msg = username + CLIENT_ID
dig = hmac.new(str(CLIENT_SECRET).encode('utf-8'),
msg = str(msg).encode('utf-8'), digestmod=hashlib.sha256).digest()
d2 = base64.b64encode(dig).decode()
return d2
def refreshToken(refreshToken):
try:
resp = client.initiate_auth(
ClientId=CLIENT_ID,
AuthFlow='REFRESH_TOKEN_AUTH',
AuthParameters={
'REFRESH_TOKEN': refreshToken,
'SECRET_HASH': CLIENT_SECRET
})
except client.exceptions.NotAuthorizedException as e:
return None, "Unauthorized"
except client.exceptions.UserNotFoundException as e:
return None, "Unauthorized"
except Exception as e:
#print(e)
logger.error(e)
return None, "Unknown error"
return resp, None
def lambda_handler(event, context):
global client
if client == None:
client = boto3.client('cognito-idp')
#print(event)
body = event
refresh_token = body['refreshToken']
resp, msg = refreshToken(refresh_token)
if msg != None:
# return {'status': 'fail', 'msg': msg}
logger.info('failed to refreshToken with refreshToken={}'.format(refresh_token))
raise Exception(msg)
id_token = resp['AuthenticationResult']['IdToken']
access_token = resp['AuthenticationResult']['AccessToken']
expires_in = resp['AuthenticationResult']['ExpiresIn']
return {'status': 'success', 'id_token': id_token, 'access_token': access_token, 'expires_in': expires_in}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment