-
-
Save Robert-Labrada/55350cd815049c20faa375f5340546bc to your computer and use it in GitHub Desktop.
Secret Rotation Examples for GCP Secret Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functions_framework | |
import json | |
import logging | |
import secrets | |
import string | |
import psycopg2 | |
from google.cloud import secretmanager | |
from google.api_core.exceptions import NotFound | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
@functions_framework.cloud_event | |
def rotation(cloud_event): | |
# Only Rotate if we're receiving a SECRET_ROTATE Event | |
if cloud_event.data["message"]["attributes"]["eventType"] == "SECRET_ROTATE": | |
# Create the Secret Manager client. | |
service_client = secretmanager.SecretManagerServiceClient() | |
# SecretId will be in the "projects/{PROJECT_ID}/secrets/{secret_id}" format | |
secret_name = cloud_event.data["message"]["attributes"]["secretId"] | |
project_id, secret_id = parse_project_and_secret_id(secret_name) | |
create_secret(service_client, project_id, secret_id) | |
set_secret(service_client, project_id, secret_id) | |
test_secret(service_client, project_id, secret_id) | |
finish_secret(service_client, project_id, secret_id) | |
def create_secret(service_client, project_id, secret_id): | |
"""Generate a new secret | |
This method first checks for the existence of a secret for the passed in secret_id. If one does not exist, it will | |
generate a new secret and put it with the passed in secret_id. | |
Args: | |
service_client (client): The secrets manager service client | |
project_id (string): The project_id or other project identifier | |
secret_id (string): The secret_id or other secret identifier | |
Raises: | |
ValueError: If the current secret is not valid JSON | |
KeyError: If the secret json does not contain the expected keys | |
""" | |
current_dict = get_secret_dict(service_client, project_id, secret_id, "CURRENT") | |
try: | |
get_secret_dict(service_client, project_id, secret_id, "PENDING") | |
except NotFound: | |
current_dict["username"] = get_alt_username(current_dict["username"]) | |
current_dict["password"] = get_random_password() | |
secret_name = f"projects/{project_id}/secrets/{secret_id}" | |
new_secret = add_secret_version( | |
service_client, secret_name, json.dumps(current_dict) | |
) | |
secret_version_num = extract_version(new_secret.name) | |
secret_metadata = get_secret(service_client, project_id, secret_id) | |
version_aliases = dict(secret_metadata.version_aliases) | |
version_aliases["PENDING"] = secret_version_num | |
update_secret_aliases(service_client, project_id, secret_id, version_aliases) | |
def set_secret(service_client, project_id, secret_id): | |
"""Set the pending secret in the database | |
This method tries to login to the database with the PENDING secret and returns on success. If that fails, it | |
tries to login with the master credentials from the masterUser in the current secret. If this succeeds, it adds all | |
grants for CURRENT user to the PENDING user, creating the user and/or setting the password in the process. | |
Else, it throws a ValueError. | |
Args: | |
service_client (client): The secrets manager service client | |
project_id (string): The project_id or other project identifier | |
secret_id (string): The secret_id or other secret identifier | |
Raises: | |
NotFoundException: If the secret with the specified secret_id and stage does not exist | |
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB | |
KeyError: If the secret json does not contain the expected keys | |
""" | |
current_dict = get_secret_dict(service_client, project_id, secret_id, "CURRENT") | |
pending_dict = get_secret_dict(service_client, project_id, secret_id, "PENDING") | |
# First try to login with the pending secret, if it succeeds, return | |
conn = get_connection(pending_dict) | |
if conn: | |
conn.close() | |
logger.info( | |
"setSecret: PENDING secret is already set as password in PostgresSQL DB for secret %s" | |
% secret_id | |
) | |
return | |
# Make sure the user from current and pending match | |
if get_alt_username(current_dict["username"]) != pending_dict["username"]: | |
logger.error( | |
"setSecret: Attempting to modify user %s other than current user or clone %s" | |
% (pending_dict["username"], current_dict["username"]) | |
) | |
raise ValueError( | |
"Attempting to modify user %s other than current user or clone %s" | |
% (pending_dict["username"], current_dict["username"]) | |
) | |
# Make sure the host from current and pending match | |
if current_dict["host"] != pending_dict["host"]: | |
logger.error( | |
"setSecret: Attempting to modify user for host %s other than current host %s" | |
% (pending_dict["host"], current_dict["host"]) | |
) | |
raise ValueError( | |
"Attempting to modify user for host %s other than current host %s" | |
% (pending_dict["host"], current_dict["host"]) | |
) | |
# Test CURRENT secret is valid by logging in | |
conn = get_connection(current_dict) | |
if not conn: | |
logger.error( | |
"setSecret: Unable to log into database using current credentials for secret %s" | |
% secret_id | |
) | |
raise ValueError( | |
"Unable to log into database using current credentials for secret %s" | |
% secret_id | |
) | |
# Using the master user secret | |
master_user_secret_id = current_dict["masterUser"] | |
super_user_dict = get_secret_dict( | |
service_client, project_id, current_dict["masterUser"], "latest" | |
) | |
super_user_dict["dbname"] = current_dict.get("dbname", None) | |
conn = get_connection(super_user_dict) | |
if not conn: | |
logger.error( | |
"setSecret: Unable to log into database using credentials in master secret %s" | |
% master_user_secret_id | |
) | |
raise ValueError( | |
"Unable to log into database using credentials in master secret %s" | |
% master_user_secret_id | |
) | |
try: | |
with conn.cursor() as cur: | |
cur.execute("SELECT quote_ident(%s)", (pending_dict["username"],)) | |
pending_username = cur.fetchone()[0] | |
cur.execute("SELECT quote_ident(%s)", (current_dict["username"],)) | |
current_username = cur.fetchone()[0] | |
cur.execute( | |
"SELECT 1 FROM pg_roles where rolname = %s", (pending_dict["username"],) | |
) | |
if len(cur.fetchall()) == 0: | |
create_role = "CREATE ROLE %s" % pending_username | |
cur.execute( | |
create_role + " WITH LOGIN PASSWORD %s", (pending_dict["password"],) | |
) | |
cur.execute("GRANT %s TO %s" % (current_username, pending_username)) | |
else: | |
alter_role = "ALTER USER %s" % pending_username | |
cur.execute( | |
alter_role + " WITH PASSWORD %s", (pending_dict["password"],) | |
) | |
conn.commit() | |
logger.info( | |
"setSecret: Successfully set password for %s in PostgreSQL DB for secret id %s." | |
% (pending_dict["username"], secret_id) | |
) | |
finally: | |
conn.close() | |
def test_secret(service_client, project_id, secret_id): | |
"""Test the pending secret against the database | |
This method tries to log into the database with the secrets staged with PENDING and runs | |
a permissions check to ensure the user has the correct permissions. | |
Args: | |
service_client (client): The secrets manager service client | |
project_id (string): The project_id or other project identifier | |
secret_id (string): The secret_id or other secret identifier | |
Raises: | |
NotFoundException: If the secret with the specified secret_id and stage does not exist | |
ValueError: If the secret is not valid JSON or pending credentials could not be used to login to the database | |
KeyError: If the secret json does not contain the expected keys | |
""" | |
pending_dict = get_secret_dict(service_client, project_id, secret_id, "PENDING") | |
conn = get_connection(pending_dict) | |
if conn: | |
try: | |
with conn.cursor() as cur: | |
cur.execute("SELECT NOW()") | |
conn.commit | |
conn.close() | |
logger.info( | |
"testSecret: Successfully signed into PostgreSQL DB with PENDING secret in %s" | |
% secret_id | |
) | |
return | |
finally: | |
conn.close() | |
else: | |
logger.error( | |
"testSecret: Unable to log into database with pending secret of secret_id %s" | |
% secret_id | |
) | |
raise ValueError( | |
"Unable to log into database with pending secret of secret %s" % secret_id | |
) | |
def finish_secret(service_client, project_id, secret_id): | |
"""Finish the rotation by marking the pending secret as current | |
This method moves the secret from the PENDING stage to the CURRENT stage. | |
Args: | |
service_client (client): The secrets manager service client | |
project_id (string): The project_id or other project identifier | |
secret_id (string): The secret_id or other secret identifier | |
Raises: | |
NotFoundException: If the secret with the specified secret_id does not exist | |
""" | |
secret_metadata = get_secret(service_client, project_id, secret_id) | |
version_aliases = dict(secret_metadata.version_aliases) | |
version_aliases["CURRENT"] = version_aliases["PENDING"] | |
version_aliases.pop("PENDING", None) | |
update_secret_aliases(service_client, project_id, secret_id, version_aliases) | |
return | |
def get_alt_username(current_username): | |
clone_suffix = "_clone" | |
if current_username.endswith(clone_suffix): | |
return current_username[: (len(clone_suffix) * -1)] | |
else: | |
new_username = current_username + clone_suffix | |
if len(new_username) > 63: | |
raise ValueError( | |
"Unable to clone user, username length with _clone appended would exceed 63 characters" | |
) | |
return new_username | |
def get_random_password(length: int = 32) -> str: | |
alphabet = string.ascii_letters + string.digits | |
return "".join(secrets.choice(alphabet) for i in range(length)) | |
def get_secret_dict(service_client, project_id, secret_id, version): | |
required_fields = ["host", "username", "password"] | |
data_str = access_secret_version(service_client, project_id, secret_id, version_id=version) | |
secret_dict = json.loads(data_str) | |
for field in required_fields: | |
if field not in secret_dict: | |
raise KeyError("%s key is missing from secret JSON" % field) | |
return secret_dict | |
def extract_version(path): | |
# Split the path by '/' | |
parts = path.split("/") | |
# Find the index of 'versions' | |
versions_index = parts.index("versions") | |
# Return the next element, which is the version number | |
return int(parts[versions_index + 1]) | |
def add_secret_version(service_client, secret_id, payload): | |
# Convert the string payload into a bytes. This step can be omitted if you | |
# pass in bytes instead of a str for the payload argument. | |
payload = payload.encode("UTF-8") | |
# Add the secret version. | |
response = service_client.add_secret_version( | |
parent=secret_id, payload={"data": payload} | |
) | |
# Print the new secret version name. | |
logger.info(f"Added secret version: {response.name}") | |
return response | |
def access_secret_version(service_client, project_id, secret_id, version_id="latest"): | |
# Build the resource name of the secret version. | |
name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}" | |
# Access the secret version. | |
response = service_client.access_secret_version(name=name) | |
# Return the decoded payload. | |
return response.payload.data.decode("UTF-8") | |
def get_secret(service_client, project_id, secret_id) -> secretmanager.GetSecretRequest: | |
""" | |
Get information about the given secret. This only returns metadata about | |
the secret container, not any secret material. | |
""" | |
# Build the resource name of the secret. | |
name = service_client.secret_path(project_id, secret_id) | |
# Get the secret. | |
response = service_client.get_secret(request={"name": name}) | |
return response | |
def update_secret_aliases( | |
service_client, project_id, secret_id, aliases | |
) -> secretmanager.UpdateSecretRequest: | |
""" | |
Update the metadata about an existing secret. | |
""" | |
# Build the resource name of the secret. | |
name = service_client.secret_path(project_id, secret_id) | |
# Update the secret. | |
secret = {"name": name, "version_aliases": aliases} | |
update_mask = {"paths": ["version_aliases"]} | |
response = service_client.update_secret( | |
request={"secret": secret, "update_mask": update_mask} | |
) | |
# Print the new secret name. | |
logging.info(f"Updated secret aliases: {response.name}") | |
def parse_project_and_secret_id(secret_name): | |
# Split the path by '/' | |
parts = secret_name.split("/") | |
# Extract the project_id and secret_id | |
project = parts[1] if len(parts) > 1 else None | |
secret_id = parts[3] if len(parts) > 3 else None | |
return project, secret_id | |
def get_connection(secret_dict): | |
port = int(secret_dict["port"]) if "port" in secret_dict else 5432 | |
dbname = ( | |
secret_dict["dbname"] if secret_dict.get("dbname") is not None else "postgres" | |
) | |
try: | |
conn = psycopg2.connect( | |
dbname=dbname, | |
user=secret_dict["username"], | |
password=secret_dict["password"], | |
host=secret_dict["host"], | |
port=port, | |
) | |
logger.info( | |
"Successfully established connection as user '%s' with host: '%s'" | |
% (secret_dict["username"], secret_dict["host"]) | |
) | |
return conn | |
except psycopg2.OperationalError: | |
return None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
functions-framework==3.* | |
google-cloud-secret-manager>=2.0.0 | |
psycopg2-binary |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
locals { | |
project_id = "<project_id>" | |
region = "us-central1" | |
zone = "us-central1-c" | |
rotation_function_name = "secret-rotator" | |
function_source_path = "path/to/function.zip" | |
} | |
# Service Account | |
resource "google_service_account" "account" { | |
account_id = "secret-manager-sa" | |
display_name = "Secret Manager Rotation Service Account" | |
} | |
# Secret Rotation Topic | |
resource "google_pubsub_topic" "secret_rotation_topic" { | |
name = "secret-rotation-topic" | |
project = local.project_id | |
} | |
# Bucket for Cloud Function source | |
resource "google_storage_bucket" "rotation_function_bucket" { | |
name = "rotation-function-source-${local.project_id}" | |
project = local.project_id | |
uniform_bucket_level_access = true | |
location = local.region | |
} | |
# Function source | |
resource "google_storage_bucket_object" "rotation_source_object" { | |
name = "${local.rotation_function_name}.zip" | |
bucket = google_storage_bucket.rotation_function_bucket.name | |
source = local.function_source_path | |
} | |
resource "google_cloudfunctions2_function" "rotation_function" { | |
name = local.rotation_function_name | |
location = local.region | |
project = local.project_id | |
build_config { | |
runtime = "python312" | |
entry_point = "rotation" | |
source { | |
storage_source { | |
bucket = google_storage_bucket.rotation_function_bucket.name | |
object = google_storage_bucket_object.rotation_source_object.name | |
} | |
} | |
service_account = google_service_account.account.id | |
} | |
service_config { | |
available_memory = "256M" | |
max_instance_count = 3 | |
min_instance_count = 0 | |
timeout_seconds = 60 | |
environment_variables = { | |
"LOG_EXECUTION_ID" = "true" | |
} | |
service_account_email = google_service_account.account.email | |
ingress_settings = "ALLOW_INTERNAL_ONLY" | |
} | |
event_trigger { | |
trigger_region = local.region | |
event_type = "google.cloud.pubsub.topic.v1.messagePublished" | |
pubsub_topic = google_pubsub_topic.secret_rotation_topic.id | |
service_account_email = google_service_account.account.email | |
retry_policy = "RETRY_POLICY_DO_NOT_RETRY" | |
} | |
} | |
resource "google_secret_manager_secret" "rotation_secret" { | |
secret_id = "rotation-secret" | |
project = local.project_id | |
replication { | |
auto {} | |
} | |
rotation { | |
next_rotation_time = "2090-10-01T00:00:00Z" # must be in the future | |
rotation_period = "2592000s" # 30 days | |
} | |
topics { | |
name = google_pubsub_topic.secret_rotation_topic.id | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functions_framework | |
import logging | |
import secrets | |
import string | |
from google.cloud import secretmanager | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def add_secret_version(secret_id, payload): | |
# Create the Secret Manager client. | |
client = secretmanager.SecretManagerServiceClient() | |
# Convert the string payload into a bytes. This step can be omitted if you | |
# pass in bytes instead of a str for the payload argument. | |
payload = payload.encode("UTF-8") | |
# Add the secret version. | |
response = client.add_secret_version(parent=secret_id, payload={"data": payload}) | |
# Print the new secret version name. | |
logger.info(f"Added secret version: {response.name}") | |
def generate_random_secret(secret_length: int = 25) -> str: | |
alphabet = string.ascii_letters + string.digits | |
return "".join(secrets.choice(alphabet) for i in range(secret_length)) | |
@functions_framework.cloud_event | |
def rotation(cloud_event): | |
# Only Rotate if we're receiving a SECRET_ROTATE Event | |
if cloud_event.data["message"]["attributes"]["eventType"] == "SECRET_ROTATE": | |
# Pull the secret id from the message. | |
# SecretId will be in the "projects/{PROJECT_ID}/secrets/{secret_id}" format | |
secret_id = cloud_event.data["message"]["attributes"]["secretId"] | |
logger.info(f"Rotating secret '{secret_id}'") | |
add_secret_version(secret_id, generate_random_secret()) | |
logger.info("Rotation Complete") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment