Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Get password from DynamoDB and decrypt with KMS
import base64
import uuid
import httplib
import urlparse
import json
import boto3
import string
import random
def get_password_from_dynamodb(env, stack_name, table_name):
"""
Function to get the password out of dynamodb
:param env: environment name
:param stack_name: name of the stack
:param table_name: name of the table storing all the DB passwords etc.
:return: string
"""
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.get_item(
Key={
'env': env,
'stackname': stack_name
}
)
if 'Item' in response:
if 'passwordbase64' in response['Item']:
return response['Item']['passwordbase64']
else:
return "No such attribute : passwordbase64"
else:
return "No key found"
def decrypt_password_from_b64(password_b64):
"""
Function to encrypt the password with KMS
:param password_b64: b64 string of the encrypted password
"""
password_encrypted = base64.b64decode(password_b64)
client = boto3.client('kms')
password = client.decrypt(CiphertextBlob=password_encrypted)
return password['Plaintext']
def send_response(request, response, status=None, reason=None):
"""
:param request: CF lambda settings
:param response: object containing the response values
:param status: to report to CloudFormation
:param reason: Message to report to CloudFormation to explain the status
:return: Object with all the reponse objects
"""
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
try:
url = urlparse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = httplib.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
except:
print("Failed to send the message to CF")
return response
def lambda_handler(event, context):
"""
Core function called when Lambda is invoked
:param event: The Lambda event params
:param context: The Lambda context params
:return: Return the response to CloudFormation
"""
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
}
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
else:
response['PhysicalResourceId'] = str(uuid.uuid4())
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['Env', 'StackName']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(
event, response, status='FAILED',
reason='The properties Env and StackName must not be empty'
)
password_b64 = get_password_from_dynamodb(event['ResourceProperties']['Env'],
event['ResourceProperties']['StackName'],
event['ResourceProperties']['TableName'])
password = decrypt_password_from_b64(password_b64)
response['Data'] = {'password': password}
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment