Skip to content

Instantly share code, notes, and snippets.

@Robert-Labrada
Created November 21, 2024 22:45
Show Gist options
  • Save Robert-Labrada/55350cd815049c20faa375f5340546bc to your computer and use it in GitHub Desktop.
Save Robert-Labrada/55350cd815049c20faa375f5340546bc to your computer and use it in GitHub Desktop.
Secret Rotation Examples for GCP Secret Manager
import functions_framework
import json
import logging
import secrets
import string
import psycopg2
from google.cloud import secretmanager
from google.api_core.exceptions import NotFound
logger = logging.getLogger()
logger.setLevel(logging.INFO)
@functions_framework.cloud_event
def rotation(cloud_event):
# Only Rotate if we're receiving a SECRET_ROTATE Event
if cloud_event.data["message"]["attributes"]["eventType"] == "SECRET_ROTATE":
# Create the Secret Manager client.
service_client = secretmanager.SecretManagerServiceClient()
# SecretId will be in the "projects/{PROJECT_ID}/secrets/{secret_id}" format
secret_name = cloud_event.data["message"]["attributes"]["secretId"]
project_id, secret_id = parse_project_and_secret_id(secret_name)
create_secret(service_client, project_id, secret_id)
set_secret(service_client, project_id, secret_id)
test_secret(service_client, project_id, secret_id)
finish_secret(service_client, project_id, secret_id)
def create_secret(service_client, project_id, secret_id):
"""Generate a new secret
This method first checks for the existence of a secret for the passed in secret_id. If one does not exist, it will
generate a new secret and put it with the passed in secret_id.
Args:
service_client (client): The secrets manager service client
project_id (string): The project_id or other project identifier
secret_id (string): The secret_id or other secret identifier
Raises:
ValueError: If the current secret is not valid JSON
KeyError: If the secret json does not contain the expected keys
"""
current_dict = get_secret_dict(service_client, project_id, secret_id, "CURRENT")
try:
get_secret_dict(service_client, project_id, secret_id, "PENDING")
except NotFound:
current_dict["username"] = get_alt_username(current_dict["username"])
current_dict["password"] = get_random_password()
secret_name = f"projects/{project_id}/secrets/{secret_id}"
new_secret = add_secret_version(
service_client, secret_name, json.dumps(current_dict)
)
secret_version_num = extract_version(new_secret.name)
secret_metadata = get_secret(service_client, project_id, secret_id)
version_aliases = dict(secret_metadata.version_aliases)
version_aliases["PENDING"] = secret_version_num
update_secret_aliases(service_client, project_id, secret_id, version_aliases)
def set_secret(service_client, project_id, secret_id):
"""Set the pending secret in the database
This method tries to login to the database with the PENDING secret and returns on success. If that fails, it
tries to login with the master credentials from the masterUser in the current secret. If this succeeds, it adds all
grants for CURRENT user to the PENDING user, creating the user and/or setting the password in the process.
Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
project_id (string): The project_id or other project identifier
secret_id (string): The secret_id or other secret identifier
Raises:
NotFoundException: If the secret with the specified secret_id and stage does not exist
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB
KeyError: If the secret json does not contain the expected keys
"""
current_dict = get_secret_dict(service_client, project_id, secret_id, "CURRENT")
pending_dict = get_secret_dict(service_client, project_id, secret_id, "PENDING")
# First try to login with the pending secret, if it succeeds, return
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info(
"setSecret: PENDING secret is already set as password in PostgresSQL DB for secret %s"
% secret_id
)
return
# Make sure the user from current and pending match
if get_alt_username(current_dict["username"]) != pending_dict["username"]:
logger.error(
"setSecret: Attempting to modify user %s other than current user or clone %s"
% (pending_dict["username"], current_dict["username"])
)
raise ValueError(
"Attempting to modify user %s other than current user or clone %s"
% (pending_dict["username"], current_dict["username"])
)
# Make sure the host from current and pending match
if current_dict["host"] != pending_dict["host"]:
logger.error(
"setSecret: Attempting to modify user for host %s other than current host %s"
% (pending_dict["host"], current_dict["host"])
)
raise ValueError(
"Attempting to modify user for host %s other than current host %s"
% (pending_dict["host"], current_dict["host"])
)
# Test CURRENT secret is valid by logging in
conn = get_connection(current_dict)
if not conn:
logger.error(
"setSecret: Unable to log into database using current credentials for secret %s"
% secret_id
)
raise ValueError(
"Unable to log into database using current credentials for secret %s"
% secret_id
)
# Using the master user secret
master_user_secret_id = current_dict["masterUser"]
super_user_dict = get_secret_dict(
service_client, project_id, current_dict["masterUser"], "latest"
)
super_user_dict["dbname"] = current_dict.get("dbname", None)
conn = get_connection(super_user_dict)
if not conn:
logger.error(
"setSecret: Unable to log into database using credentials in master secret %s"
% master_user_secret_id
)
raise ValueError(
"Unable to log into database using credentials in master secret %s"
% master_user_secret_id
)
try:
with conn.cursor() as cur:
cur.execute("SELECT quote_ident(%s)", (pending_dict["username"],))
pending_username = cur.fetchone()[0]
cur.execute("SELECT quote_ident(%s)", (current_dict["username"],))
current_username = cur.fetchone()[0]
cur.execute(
"SELECT 1 FROM pg_roles where rolname = %s", (pending_dict["username"],)
)
if len(cur.fetchall()) == 0:
create_role = "CREATE ROLE %s" % pending_username
cur.execute(
create_role + " WITH LOGIN PASSWORD %s", (pending_dict["password"],)
)
cur.execute("GRANT %s TO %s" % (current_username, pending_username))
else:
alter_role = "ALTER USER %s" % pending_username
cur.execute(
alter_role + " WITH PASSWORD %s", (pending_dict["password"],)
)
conn.commit()
logger.info(
"setSecret: Successfully set password for %s in PostgreSQL DB for secret id %s."
% (pending_dict["username"], secret_id)
)
finally:
conn.close()
def test_secret(service_client, project_id, secret_id):
"""Test the pending secret against the database
This method tries to log into the database with the secrets staged with PENDING and runs
a permissions check to ensure the user has the correct permissions.
Args:
service_client (client): The secrets manager service client
project_id (string): The project_id or other project identifier
secret_id (string): The secret_id or other secret identifier
Raises:
NotFoundException: If the secret with the specified secret_id and stage does not exist
ValueError: If the secret is not valid JSON or pending credentials could not be used to login to the database
KeyError: If the secret json does not contain the expected keys
"""
pending_dict = get_secret_dict(service_client, project_id, secret_id, "PENDING")
conn = get_connection(pending_dict)
if conn:
try:
with conn.cursor() as cur:
cur.execute("SELECT NOW()")
conn.commit
conn.close()
logger.info(
"testSecret: Successfully signed into PostgreSQL DB with PENDING secret in %s"
% secret_id
)
return
finally:
conn.close()
else:
logger.error(
"testSecret: Unable to log into database with pending secret of secret_id %s"
% secret_id
)
raise ValueError(
"Unable to log into database with pending secret of secret %s" % secret_id
)
def finish_secret(service_client, project_id, secret_id):
"""Finish the rotation by marking the pending secret as current
This method moves the secret from the PENDING stage to the CURRENT stage.
Args:
service_client (client): The secrets manager service client
project_id (string): The project_id or other project identifier
secret_id (string): The secret_id or other secret identifier
Raises:
NotFoundException: If the secret with the specified secret_id does not exist
"""
secret_metadata = get_secret(service_client, project_id, secret_id)
version_aliases = dict(secret_metadata.version_aliases)
version_aliases["CURRENT"] = version_aliases["PENDING"]
version_aliases.pop("PENDING", None)
update_secret_aliases(service_client, project_id, secret_id, version_aliases)
return
def get_alt_username(current_username):
clone_suffix = "_clone"
if current_username.endswith(clone_suffix):
return current_username[: (len(clone_suffix) * -1)]
else:
new_username = current_username + clone_suffix
if len(new_username) > 63:
raise ValueError(
"Unable to clone user, username length with _clone appended would exceed 63 characters"
)
return new_username
def get_random_password(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for i in range(length))
def get_secret_dict(service_client, project_id, secret_id, version):
required_fields = ["host", "username", "password"]
data_str = access_secret_version(service_client, project_id, secret_id, version_id=version)
secret_dict = json.loads(data_str)
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
return secret_dict
def extract_version(path):
# Split the path by '/'
parts = path.split("/")
# Find the index of 'versions'
versions_index = parts.index("versions")
# Return the next element, which is the version number
return int(parts[versions_index + 1])
def add_secret_version(service_client, secret_id, payload):
# Convert the string payload into a bytes. This step can be omitted if you
# pass in bytes instead of a str for the payload argument.
payload = payload.encode("UTF-8")
# Add the secret version.
response = service_client.add_secret_version(
parent=secret_id, payload={"data": payload}
)
# Print the new secret version name.
logger.info(f"Added secret version: {response.name}")
return response
def access_secret_version(service_client, project_id, secret_id, version_id="latest"):
# Build the resource name of the secret version.
name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"
# Access the secret version.
response = service_client.access_secret_version(name=name)
# Return the decoded payload.
return response.payload.data.decode("UTF-8")
def get_secret(service_client, project_id, secret_id) -> secretmanager.GetSecretRequest:
"""
Get information about the given secret. This only returns metadata about
the secret container, not any secret material.
"""
# Build the resource name of the secret.
name = service_client.secret_path(project_id, secret_id)
# Get the secret.
response = service_client.get_secret(request={"name": name})
return response
def update_secret_aliases(
service_client, project_id, secret_id, aliases
) -> secretmanager.UpdateSecretRequest:
"""
Update the metadata about an existing secret.
"""
# Build the resource name of the secret.
name = service_client.secret_path(project_id, secret_id)
# Update the secret.
secret = {"name": name, "version_aliases": aliases}
update_mask = {"paths": ["version_aliases"]}
response = service_client.update_secret(
request={"secret": secret, "update_mask": update_mask}
)
# Print the new secret name.
logging.info(f"Updated secret aliases: {response.name}")
def parse_project_and_secret_id(secret_name):
# Split the path by '/'
parts = secret_name.split("/")
# Extract the project_id and secret_id
project = parts[1] if len(parts) > 1 else None
secret_id = parts[3] if len(parts) > 3 else None
return project, secret_id
def get_connection(secret_dict):
port = int(secret_dict["port"]) if "port" in secret_dict else 5432
dbname = (
secret_dict["dbname"] if secret_dict.get("dbname") is not None else "postgres"
)
try:
conn = psycopg2.connect(
dbname=dbname,
user=secret_dict["username"],
password=secret_dict["password"],
host=secret_dict["host"],
port=port,
)
logger.info(
"Successfully established connection as user '%s' with host: '%s'"
% (secret_dict["username"], secret_dict["host"])
)
return conn
except psycopg2.OperationalError:
return None
functions-framework==3.*
google-cloud-secret-manager>=2.0.0
psycopg2-binary
locals {
project_id = "<project_id>"
region = "us-central1"
zone = "us-central1-c"
rotation_function_name = "secret-rotator"
function_source_path = "path/to/function.zip"
}
# Service Account
resource "google_service_account" "account" {
account_id = "secret-manager-sa"
display_name = "Secret Manager Rotation Service Account"
}
# Secret Rotation Topic
resource "google_pubsub_topic" "secret_rotation_topic" {
name = "secret-rotation-topic"
project = local.project_id
}
# Bucket for Cloud Function source
resource "google_storage_bucket" "rotation_function_bucket" {
name = "rotation-function-source-${local.project_id}"
project = local.project_id
uniform_bucket_level_access = true
location = local.region
}
# Function source
resource "google_storage_bucket_object" "rotation_source_object" {
name = "${local.rotation_function_name}.zip"
bucket = google_storage_bucket.rotation_function_bucket.name
source = local.function_source_path
}
resource "google_cloudfunctions2_function" "rotation_function" {
name = local.rotation_function_name
location = local.region
project = local.project_id
build_config {
runtime = "python312"
entry_point = "rotation"
source {
storage_source {
bucket = google_storage_bucket.rotation_function_bucket.name
object = google_storage_bucket_object.rotation_source_object.name
}
}
service_account = google_service_account.account.id
}
service_config {
available_memory = "256M"
max_instance_count = 3
min_instance_count = 0
timeout_seconds = 60
environment_variables = {
"LOG_EXECUTION_ID" = "true"
}
service_account_email = google_service_account.account.email
ingress_settings = "ALLOW_INTERNAL_ONLY"
}
event_trigger {
trigger_region = local.region
event_type = "google.cloud.pubsub.topic.v1.messagePublished"
pubsub_topic = google_pubsub_topic.secret_rotation_topic.id
service_account_email = google_service_account.account.email
retry_policy = "RETRY_POLICY_DO_NOT_RETRY"
}
}
resource "google_secret_manager_secret" "rotation_secret" {
secret_id = "rotation-secret"
project = local.project_id
replication {
auto {}
}
rotation {
next_rotation_time = "2090-10-01T00:00:00Z" # must be in the future
rotation_period = "2592000s" # 30 days
}
topics {
name = google_pubsub_topic.secret_rotation_topic.id
}
}
import functions_framework
import logging
import secrets
import string
from google.cloud import secretmanager
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def add_secret_version(secret_id, payload):
# Create the Secret Manager client.
client = secretmanager.SecretManagerServiceClient()
# Convert the string payload into a bytes. This step can be omitted if you
# pass in bytes instead of a str for the payload argument.
payload = payload.encode("UTF-8")
# Add the secret version.
response = client.add_secret_version(parent=secret_id, payload={"data": payload})
# Print the new secret version name.
logger.info(f"Added secret version: {response.name}")
def generate_random_secret(secret_length: int = 25) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for i in range(secret_length))
@functions_framework.cloud_event
def rotation(cloud_event):
# Only Rotate if we're receiving a SECRET_ROTATE Event
if cloud_event.data["message"]["attributes"]["eventType"] == "SECRET_ROTATE":
# Pull the secret id from the message.
# SecretId will be in the "projects/{PROJECT_ID}/secrets/{secret_id}" format
secret_id = cloud_event.data["message"]["attributes"]["secretId"]
logger.info(f"Rotating secret '{secret_id}'")
add_secret_version(secret_id, generate_random_secret())
logger.info("Rotation Complete")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment