Skip to content

Instantly share code, notes, and snippets.

@theseanything
Forked from sgtoj/copy_sm_secrets.py
Last active June 23, 2022 12:43
Show Gist options
  • Save theseanything/1bb8add0077d3a2f5d979c12c6b9f140 to your computer and use it in GitHub Desktop.
Save theseanything/1bb8add0077d3a2f5d979c12c6b9f140 to your computer and use it in GitHub Desktop.
Python script to copy AWS SecretsManager Secrets
#!/usr/bin/env python3
import argparse
import os
import boto3
from botocore.exceptions import ClientError
# -------------------------------------------------------------------- main ---
def copy_secrets(config):
dry_run = config["global.dryrun"]
if dry_run:
print(f"dry run enabled")
src, dst = compile_config(config)
src_secrets = pull_secrets(src["aws.session"], src["ssm.prefix"], config["global.excluded_secrets"])
dst_secrets = tranform_secrets(src["ssm.prefix"], dst["ssm.prefix"], src_secrets)
results = push_secrets(dst["aws.session"], dst_secrets, dry_run)
return results
# --------------------------------------------------------------------- fns ---
def pull_secrets(session, prefix, excluded_secrets):
secrets = list_sm_secrets(session, prefix, excluded_secrets)
for secret in secrets:
secret_value = get_sm_secret_value(session, secret["Name"])
yield secret_value
def push_secret(session, secret, dry_run):
current_secret = get_sm_secret_value(session, secret["Name"])
result = {"action": "nochange", "name": secret["Name"]}
if current_secret is None:
result["action"] = "created"
if not dry_run:
create_sm_secret(session, secret)
elif current_secret["SecretString"] != secret["SecretString"]:
result["action"] = "updated"
if not dry_run:
put_sm_secret_value(session, secret)
return result
def push_secrets(session, secrets, dry_run):
results = []
for secret in secrets:
result = push_secret(session, secret, dry_run)
print(f'{result["action"]:<10}{result["name"]}')
results.append(result)
return results
def tranform_secrets(old, new, secrets):
for secret in secrets:
new_name = secret["Name"].replace(old, new)
secret["Name"] = new_name
yield secret
# --------------------------------------------------------------------- aws ---
def create_sm_secret(session, secret):
sm = session.client("secretsmanager")
params = {"Name": secret["Name"]}
if "SecretString" in secret:
params["SecretString"] = secret["SecretString"]
else:
params["SecretBinary"] = secret["SecretBinary"]
response = sm.create_secret(**params)
return response
def get_sm_secret_value(session, name):
sm = session.client("secretsmanager")
secret = None
try:
secret = sm.get_secret_value(SecretId=name)
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
pass # ignore if secret is missing
else:
raise err
return secret or None
def list_sm_secrets(session, filter_prefix, excluded_secrets):
sm = session.client("secretsmanager")
pages = sm.get_paginator("list_secrets").paginate()
for page in pages:
secrets = page["SecretList"]
for secret in secrets:
if (not secret["Name"].startswith(filter_prefix) or
secret["Name"] in excluded_secrets or
secret.get("DeletedDate") is not None):
continue
yield secret
def put_sm_secret_value(session, secret):
sm = session.client("secretsmanager")
params = {"SecretId": secret["Name"]}
if "SecretString" in secret:
params["SecretString"] = secret["SecretString"]
else:
params["SecretBinary"] = secret["SecretBinary"]
response = sm.put_secret_value(**params)
return response
# ------------------------------------------------------------------ config ---
def compile_config(config):
src = config["src"]
dst = config["dst"]
if src.get("ssm.prefix") is None or dst.get("ssm.prefix") is None:
raise Exception("Must define source and destination namespace!")
src["aws.session"] = boto3.Session(
region_name=src["aws.region"], profile_name=src["aws.profile"]
)
dst["aws.session"] = boto3.Session(
region_name=dst["aws.region"], profile_name=dst["aws.profile"]
)
return src, dst
def get_default_config():
config = {
"global.dryrun": int(os.environ.get("DRY_RUN", "1")) == 1,
"global.excluded_secrets": [],
"src": {
"aws.region": os.environ.get("SRC_AWS_REGION", "eu-west-1"),
"aws.profile": os.environ.get("SRC_AWS_PROFILE"),
"ssm.prefix": os.environ.get("SRC_SSM_NAMESPACE"),
},
"dst": {
"aws.region": os.environ.get("DST_AWS_REGION", "eu-west-1"),
"aws.profile": os.environ.get("DST_AWS_PROFILE"),
"ssm.prefix": os.environ.get("DST_SSM_NAMESPACE"),
},
}
return config
# ---------------------------------------------------------------- handlers ---
def script_handler(args):
config = get_default_config()
config["src"]["ssm.prefix"] = args.src_prefix
config["dst"]["ssm.prefix"] = args.dst_prefix
config["src"]["aws.profile"] = args.src_profile
config["dst"]["aws.profile"] = args.dst_profile
config["global.dryrun"] = not args.no_dry_run
config["global.excluded_secrets"] = args.excluded_secret
copy_secrets(config)
def lambda_handler(event, context):
raise Exception("Not implemented yet!")
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="AWS Secrets Manager Copier")
parser.add_argument("--no-dry-run", help="disable dry run", action="store_true")
parser.add_argument("--src-prefix", help="prefix to filter source secrets", type=str, default="")
parser.add_argument("--dst-prefix", help="prefix to replace on destination secrets", type=str, default="")
parser.add_argument("--src-profile", help="AWS Profile to use for secret source")
parser.add_argument("--dst-profile", help="AWS Profile to use for secret destination")
parser.add_argument("--excluded-secret", help="name of secret to exclude", action="append", default=[])
args = parser.parse_args()
script_handler(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment