Generates, encrypt and store password in DynamoDB
import base64 | |
import uuid | |
import httplib | |
import urlparse | |
import json | |
import boto3 | |
import string | |
import random | |
def encrypt_password_to_b64(key_id=None, password=None): | |
""" | |
Function to encrypt the password with KMS | |
:param: key_id: KMS Key Id | |
:param: password to encrypt | |
:return: String password in base64 | |
""" | |
client = boto3.client('kms') | |
encrypted = client.encrypt( | |
KeyId=key_id, | |
Plaintext=password | |
) | |
encrypted_b64 = base64.b64encode(encrypted['CiphertextBlob']) | |
return encrypted_b64 | |
def put_password_in_dynamodb(env, stack_name, table_name, password): | |
""" | |
Function to store the b64 password in DynamoDB | |
:param env: String for the environment name (usually dev / prod etc.) | |
:param stack_name: Name of the cloudformation stack | |
:param password: string of the base64 encrypted password | |
""" | |
client = boto3.resource('dynamodb') | |
table = client.Table(table_name) | |
response = table.put_item( | |
Item={ | |
'env': env, | |
'stackname': stack_name, | |
'passwordbase64': password | |
} | |
) | |
def generate_random_password(password_length=20): | |
""" | |
Generates a random password sent back to CF | |
:param password_length: Length of the password in number of characters | |
:return: String of the password | |
""" | |
password = '' | |
char_set = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-' | |
while '-' not in password: | |
password = ''.join(random.sample(char_set * 6, int(password_length))) | |
return password | |
def send_response(request, response, status=None, reason=None): | |
""" | |
Send our response to the pre-signed URL supplied by CloudFormation | |
If no ResponseURL is found in the request, there is no place to send a | |
response. This may be the case if the supplied event was for testing. | |
:return: response object | |
""" | |
if status is not None: | |
response['Status'] = status | |
if reason is not None: | |
response['Reason'] = reason | |
if 'ResponseURL' in request and request['ResponseURL']: | |
try: | |
url = urlparse.urlparse(request['ResponseURL']) | |
body = json.dumps(response) | |
https = httplib.HTTPSConnection(url.hostname) | |
https.request('PUT', url.path + '?' + url.query, body) | |
except: | |
print("Failed to send the response to the provdided URL") | |
return response | |
def lambda_handler(event, context): | |
""" | |
Core function called by Lambda | |
The function will determine what to do when called. | |
:param event: Lambda event data | |
:param context: Lambda defined context params | |
:return: Calls for send_response when the code could be executed without problem | |
""" | |
response = { | |
'StackId': event['StackId'], | |
'RequestId': event['RequestId'], | |
'LogicalResourceId': event['LogicalResourceId'], | |
'Status': 'SUCCESS' | |
} | |
if 'PhysicalResourceId' in event: | |
response['PhysicalResourceId'] = event['PhysicalResourceId'] | |
else: | |
response['PhysicalResourceId'] = str(uuid.uuid4()) | |
# There is nothing to do for a delete request | |
if event['RequestType'] == 'Delete': | |
return send_response(event, response) | |
if event['RequestType'] == 'Update': | |
return send_response(event, response) | |
for key in ['KeyId', 'PasswordLength']: | |
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]: | |
return send_response(event, | |
response, | |
status='FAILED', | |
reason='The properties KeyId and PasswordLength must not be empty' | |
) | |
db_password = generate_random_password(event['ResourceProperties']['PasswordLength']) | |
encrypted_password = encrypt_password_to_b64(event['ResourceProperties']['KeyId'], db_password) | |
put_password_in_dynamodb(event['ResourceProperties']['Env'], | |
event['ResourceProperties']['StackName'], | |
event['ResourceProperties']['TableName'], | |
encrypted_password | |
) | |
response['Reason'] = 'The value was successfully encrypted' | |
return send_response(event, response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
thanks, really helpful