Skip to content

Instantly share code, notes, and snippets.

@JohnPreston
Last active July 1, 2019 23:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save JohnPreston/e98e5cfcc62b2236815574a84e58b42b to your computer and use it in GitHub Desktop.
Save JohnPreston/e98e5cfcc62b2236815574a84e58b42b to your computer and use it in GitHub Desktop.
Get password from DynamoDB and decrypt with KMS
import base64
import uuid
import httplib
import urlparse
import json
import boto3
import string
import random
def get_password_from_dynamodb(env, stack_name, table_name):
"""
Function to get the password out of dynamodb
:param env: environment name
:param stack_name: name of the stack
:param table_name: name of the table storing all the DB passwords etc.
:return: string
"""
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.get_item(
Key={
'env': env,
'stackname': stack_name
}
)
if 'Item' in response:
if 'passwordbase64' in response['Item']:
return response['Item']['passwordbase64']
else:
return "No such attribute : passwordbase64"
else:
return "No key found"
def decrypt_password_from_b64(password_b64):
"""
Function to encrypt the password with KMS
:param password_b64: b64 string of the encrypted password
"""
password_encrypted = base64.b64decode(password_b64)
client = boto3.client('kms')
password = client.decrypt(CiphertextBlob=password_encrypted)
return password['Plaintext']
def send_response(request, response, status=None, reason=None):
"""
:param request: CF lambda settings
:param response: object containing the response values
:param status: to report to CloudFormation
:param reason: Message to report to CloudFormation to explain the status
:return: Object with all the reponse objects
"""
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
try:
url = urlparse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = httplib.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
except:
print("Failed to send the message to CF")
return response
def lambda_handler(event, context):
"""
Core function called when Lambda is invoked
:param event: The Lambda event params
:param context: The Lambda context params
:return: Return the response to CloudFormation
"""
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
}
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
else:
response['PhysicalResourceId'] = str(uuid.uuid4())
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['Env', 'StackName']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(
event, response, status='FAILED',
reason='The properties Env and StackName must not be empty'
)
password_b64 = get_password_from_dynamodb(event['ResourceProperties']['Env'],
event['ResourceProperties']['StackName'],
event['ResourceProperties']['TableName'])
password = decrypt_password_from_b64(password_b64)
response['Data'] = {'password': password}
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment