Last active
November 18, 2023 08:29
-
-
Save JohnPreston/6c6dcd0e726219a10111182891f8547b to your computer and use it in GitHub Desktop.
Generates, encrypt and store password in DynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import uuid | |
import httplib | |
import urlparse | |
import json | |
import boto3 | |
import string | |
import random | |
def encrypt_password_to_b64(key_id=None, password=None): | |
""" | |
Function to encrypt the password with KMS | |
:param: key_id: KMS Key Id | |
:param: password to encrypt | |
:return: String password in base64 | |
""" | |
client = boto3.client('kms') | |
encrypted = client.encrypt( | |
KeyId=key_id, | |
Plaintext=password | |
) | |
encrypted_b64 = base64.b64encode(encrypted['CiphertextBlob']) | |
return encrypted_b64 | |
def put_password_in_dynamodb(env, stack_name, table_name, password): | |
""" | |
Function to store the b64 password in DynamoDB | |
:param env: String for the environment name (usually dev / prod etc.) | |
:param stack_name: Name of the cloudformation stack | |
:param password: string of the base64 encrypted password | |
""" | |
client = boto3.resource('dynamodb') | |
table = client.Table(table_name) | |
response = table.put_item( | |
Item={ | |
'env': env, | |
'stackname': stack_name, | |
'passwordbase64': password | |
} | |
) | |
def generate_random_password(password_length=20): | |
""" | |
Generates a random password sent back to CF | |
:param password_length: Length of the password in number of characters | |
:return: String of the password | |
""" | |
password = '' | |
char_set = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-' | |
while '-' not in password: | |
password = ''.join(random.sample(char_set * 6, int(password_length))) | |
return password | |
def send_response(request, response, status=None, reason=None): | |
""" | |
Send our response to the pre-signed URL supplied by CloudFormation | |
If no ResponseURL is found in the request, there is no place to send a | |
response. This may be the case if the supplied event was for testing. | |
:return: response object | |
""" | |
if status is not None: | |
response['Status'] = status | |
if reason is not None: | |
response['Reason'] = reason | |
if 'ResponseURL' in request and request['ResponseURL']: | |
try: | |
url = urlparse.urlparse(request['ResponseURL']) | |
body = json.dumps(response) | |
https = httplib.HTTPSConnection(url.hostname) | |
https.request('PUT', url.path + '?' + url.query, body) | |
except: | |
print("Failed to send the response to the provdided URL") | |
return response | |
def lambda_handler(event, context): | |
""" | |
Core function called by Lambda | |
The function will determine what to do when called. | |
:param event: Lambda event data | |
:param context: Lambda defined context params | |
:return: Calls for send_response when the code could be executed without problem | |
""" | |
response = { | |
'StackId': event['StackId'], | |
'RequestId': event['RequestId'], | |
'LogicalResourceId': event['LogicalResourceId'], | |
'Status': 'SUCCESS' | |
} | |
if 'PhysicalResourceId' in event: | |
response['PhysicalResourceId'] = event['PhysicalResourceId'] | |
else: | |
response['PhysicalResourceId'] = str(uuid.uuid4()) | |
# There is nothing to do for a delete request | |
if event['RequestType'] == 'Delete': | |
return send_response(event, response) | |
if event['RequestType'] == 'Update': | |
return send_response(event, response) | |
for key in ['KeyId', 'PasswordLength']: | |
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]: | |
return send_response(event, | |
response, | |
status='FAILED', | |
reason='The properties KeyId and PasswordLength must not be empty' | |
) | |
db_password = generate_random_password(event['ResourceProperties']['PasswordLength']) | |
encrypted_password = encrypt_password_to_b64(event['ResourceProperties']['KeyId'], db_password) | |
put_password_in_dynamodb(event['ResourceProperties']['Env'], | |
event['ResourceProperties']['StackName'], | |
event['ResourceProperties']['TableName'], | |
encrypted_password | |
) | |
response['Reason'] = 'The value was successfully encrypted' | |
return send_response(event, response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks, really helpful