Skip to content

Instantly share code, notes, and snippets.

@jgautheron
Forked from JohnPreston/get_password.py
Created July 14, 2017 15:34
Show Gist options
  • Save jgautheron/063e9f15773a0f23f4c86b8d59533a12 to your computer and use it in GitHub Desktop.
Save jgautheron/063e9f15773a0f23f4c86b8d59533a12 to your computer and use it in GitHub Desktop.
Get password from DynamoDB and decrypt with KMS
import base64
import uuid
import httplib
import urlparse
import json
import boto3
import string
import random
def get_password_from_dynamodb(env, stack_name, table_name):
"""
Function to get the password out of dynamodb
:param env: environment name
:param stack_name: name of the stack
:param table_name: name of the table storing all the DB passwords etc.
:return: string
"""
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.get_item(
Key={
'env': env,
'stackname': stack_name
}
)
if 'Item' in response:
if 'passwordbase64' in response['Item']:
return response['Item']['passwordbase64']
else:
return "No such attribute : passwordbase64"
else:
return "No key found"
def decrypt_password_from_b64(password_b64):
"""
Function to encrypt the password with KMS
:param password_b64: b64 string of the encrypted password
"""
password_encrypted = base64.b64decode(password_b64)
client = boto3.client('kms')
password = client.decrypt(CiphertextBlob=password_encrypted)
return password['Plaintext']
def send_response(request, response, status=None, reason=None):
"""
:param request: CF lambda settings
:param response: object containing the response values
:param status: to report to CloudFormation
:param reason: Message to report to CloudFormation to explain the status
:return: Object with all the reponse objects
"""
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
try:
url = urlparse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = httplib.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
except:
print("Failed to send the message to CF")
return response
def lambda_handler(event, context):
"""
Core function called when Lambda is invoked
:param event: The Lambda event params
:param context: The Lambda context params
:return: Return the response to CloudFormation
"""
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
}
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
else:
response['PhysicalResourceId'] = str(uuid.uuid4())
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['Env', 'StackName']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(
event, response, status='FAILED',
reason='The properties Env and StackName must not be empty'
)
password_b64 = get_password_from_dynamodb(event['ResourceProperties']['Env'],
event['ResourceProperties']['StackName'],
event['ResourceProperties']['TableName'])
password = decrypt_password_from_b64(password_b64)
response['Data'] = {'password': password}
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment