Get password from DynamoDB and decrypt with KMS
import base64
import uuid
import httplib
import urlparse
import json
import boto3
import string
import random
def get_password_from_dynamodb(env, stack_name, table_name):
Function to get the password out of dynamodb
:param env: environment name
:param stack_name: name of the stack
:param table_name: name of the table storing all the DB passwords etc.
:return: string
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.get_item(
'env': env,
'stackname': stack_name
if 'Item' in response:
if 'passwordbase64' in response['Item']:
return response['Item']['passwordbase64']
return "No such attribute : passwordbase64"
return "No key found"
def decrypt_password_from_b64(password_b64):
Function to encrypt the password with KMS
:param password_b64: b64 string of the encrypted password
password_encrypted = base64.b64decode(password_b64)
client = boto3.client('kms')
password = client.decrypt(CiphertextBlob=password_encrypted)
return password['Plaintext']
def send_response(request, response, status=None, reason=None):
:param request: CF lambda settings
:param response: object containing the response values
:param status: to report to CloudFormation
:param reason: Message to report to CloudFormation to explain the status
:return: Object with all the reponse objects
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
url = urlparse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = httplib.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
print("Failed to send the message to CF")
return response
def lambda_handler(event, context):
Core function called when Lambda is invoked
:param event: The Lambda event params
:param context: The Lambda context params
:return: Return the response to CloudFormation
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
response['PhysicalResourceId'] = str(uuid.uuid4())
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['Env', 'StackName']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(
event, response, status='FAILED',
reason='The properties Env and StackName must not be empty'
password_b64 = get_password_from_dynamodb(event['ResourceProperties']['Env'],
password = decrypt_password_from_b64(password_b64)
response['Data'] = {'password': password}
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
