Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Generates, encrypt and store password in DynamoDB
import base64
import uuid
import httplib
import urlparse
import json
import boto3
import string
import random
def encrypt_password_to_b64(key_id=None, password=None):
"""
Function to encrypt the password with KMS
:param: key_id: KMS Key Id
:param: password to encrypt
:return: String password in base64
"""
client = boto3.client('kms')
encrypted = client.encrypt(
KeyId=key_id,
Plaintext=password
)
encrypted_b64 = base64.b64encode(encrypted['CiphertextBlob'])
return encrypted_b64
def put_password_in_dynamodb(env, stack_name, table_name, password):
"""
Function to store the b64 password in DynamoDB
:param env: String for the environment name (usually dev / prod etc.)
:param stack_name: Name of the cloudformation stack
:param password: string of the base64 encrypted password
"""
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.put_item(
Item={
'env': env,
'stackname': stack_name,
'passwordbase64': password
}
)
def generate_random_password(password_length=20):
"""
Generates a random password sent back to CF
:param password_length: Length of the password in number of characters
:return: String of the password
"""
password = ''
char_set = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-'
while '-' not in password:
password = ''.join(random.sample(char_set * 6, int(password_length)))
return password
def send_response(request, response, status=None, reason=None):
"""
Send our response to the pre-signed URL supplied by CloudFormation
If no ResponseURL is found in the request, there is no place to send a
response. This may be the case if the supplied event was for testing.
:return: response object
"""
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
try:
url = urlparse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = httplib.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
except:
print("Failed to send the response to the provdided URL")
return response
def lambda_handler(event, context):
"""
Core function called by Lambda
The function will determine what to do when called.
:param event: Lambda event data
:param context: Lambda defined context params
:return: Calls for send_response when the code could be executed without problem
"""
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
}
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
else:
response['PhysicalResourceId'] = str(uuid.uuid4())
# There is nothing to do for a delete request
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['KeyId', 'PasswordLength']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(event,
response,
status='FAILED',
reason='The properties KeyId and PasswordLength must not be empty'
)
db_password = generate_random_password(event['ResourceProperties']['PasswordLength'])
encrypted_password = encrypt_password_to_b64(event['ResourceProperties']['KeyId'], db_password)
put_password_in_dynamodb(event['ResourceProperties']['Env'],
event['ResourceProperties']['StackName'],
event['ResourceProperties']['TableName'],
encrypted_password
)
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment