Last active
April 22, 2025 20:55
-
-
Save randy-jerome/da5ad2ab1b24b414b78467a094ae5087 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Azure Key Vault to Environment Variables | |
This script retrieves secrets from Azure Key Vault and sets them as environment variables. | |
It uses DefaultAzureCredential for authentication, which supports multiple authentication methods: | |
- Managed Identity | |
- Azure CLI credentials | |
- Visual Studio Code credentials | |
- Interactive browser authentication | |
- Environment variables (client ID, tenant ID, client secret) | |
Usage: | |
python azure_kv_to_env.py --vault-name <vault-name> [--secrets <secret1,secret2,...>] | |
# Use with eval in bash to set environment variables directly: | |
eval $(python azure_kv_to_env.py --vault-name <vault-name> --eval) | |
Dependencies: | |
pip install azure-identity azure-keyvault-secrets | |
""" | |
import argparse | |
import os | |
import logging | |
import sys | |
from typing import List, Optional | |
# Azure SDK imports | |
from azure.identity import DefaultAzureCredential, CredentialUnavailableError | |
from azure.keyvault.secrets import SecretClient | |
from azure.core.exceptions import ( | |
ClientAuthenticationError, | |
HttpResponseError, | |
ResourceNotFoundError, | |
ServiceRequestError, | |
) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.ERROR, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)] | |
) | |
logger = logging.getLogger(__name__) | |
def setup_argument_parser() -> argparse.ArgumentParser: | |
"""Setup and return the argument parser.""" | |
parser = argparse.ArgumentParser( | |
description="Retrieve secrets from Azure Key Vault and set as environment variables." | |
) | |
parser.add_argument( | |
"--vault-name", | |
required=True, | |
help="Name of the Azure Key Vault (without .vault.azure.net)." | |
) | |
parser.add_argument( | |
"--secrets", | |
help="Comma-separated list of secret names to retrieve. If not specified, all accessible secrets will be retrieved.", | |
) | |
parser.add_argument( | |
"--prefix", | |
default="", | |
help="Optional prefix to add to environment variable names." | |
) | |
parser.add_argument( | |
"--debug", | |
action="store_true", | |
help="Enable debug logging." | |
) | |
parser.add_argument( | |
"--show", | |
action="store_true", | |
help="Show environment variables after setting them." | |
) | |
parser.add_argument( | |
"--eval", | |
action="store_true", | |
help="Output in a format suitable for bash eval: eval $(python azure_kv_to_env.py --vault-name name --eval)" | |
) | |
return parser | |
def get_secret_client(vault_name: str) -> SecretClient: | |
""" | |
Create and return a SecretClient for the specified Key Vault. | |
Args: | |
vault_name: Name of the Azure Key Vault. | |
Returns: | |
SecretClient: Client for interacting with Key Vault secrets. | |
Raises: | |
CredentialUnavailableError: If no valid credential is available. | |
ClientAuthenticationError: If authentication fails. | |
""" | |
vault_url = f"https://{vault_name}.vault.azure.net" | |
try: | |
# Use DefaultAzureCredential which tries multiple authentication methods | |
credential = DefaultAzureCredential() | |
client = SecretClient(vault_url=vault_url, credential=credential) | |
return client | |
except CredentialUnavailableError as e: | |
logger.error(f"No valid authentication method available: {str(e)}") | |
raise | |
except ClientAuthenticationError as e: | |
logger.error(f"Authentication failed: {str(e)}") | |
raise | |
def get_secrets(client: SecretClient, secret_names: Optional[List[str]] = None) -> dict: | |
""" | |
Retrieve secrets from Key Vault. | |
Args: | |
client: SecretClient instance. | |
secret_names: Optional list of specific secret names to retrieve. | |
Returns: | |
dict: Dictionary mapping secret names to values. | |
""" | |
secrets = {} | |
try: | |
if secret_names: | |
# Get specific secrets | |
for name in secret_names: | |
try: | |
secret = client.get_secret(name) | |
secrets[name] = secret.value | |
logger.info(f"Retrieved secret: {name}") | |
except ResourceNotFoundError: | |
logger.warning(f"Secret not found: {name}") | |
except HttpResponseError as e: | |
logger.error(f"Error retrieving secret {name}: {str(e)}") | |
else: | |
# Get all secrets | |
for secret_properties in client.list_properties_of_secrets(): | |
try: | |
secret = client.get_secret(secret_properties.name) | |
secrets[secret_properties.name] = secret.value | |
logger.info(f"Retrieved secret: {secret_properties.name}") | |
except HttpResponseError as e: | |
logger.error(f"Error retrieving secret {secret_properties.name}: {str(e)}") | |
except HttpResponseError as e: | |
logger.error(f"Error accessing Key Vault: {str(e)}") | |
raise | |
return secrets | |
def set_environment_variables(secrets: dict, prefix: str = "") -> None: | |
""" | |
Set secrets as environment variables. | |
Args: | |
secrets: Dictionary mapping secret names to values. | |
prefix: Optional prefix to add to environment variable names. | |
""" | |
for name, value in secrets.items(): | |
# Replace dashes with underscores to create a valid environment variable name | |
env_name = name.replace("-", "_") | |
env_var_name = f"{prefix}{env_name}".upper() | |
os.environ[env_var_name] = value | |
logger.info(f"Set environment variable: {env_var_name}") | |
def show_export_commands(secrets: dict, prefix: str = "") -> None: | |
""" | |
Print export commands for the secrets that can be copied to the terminal. | |
Args: | |
secrets: Dictionary mapping secret names to values. | |
prefix: Optional prefix to add to environment variable names. | |
""" | |
print("\n# Run the following commands to set the environment variables in your bash shell:") | |
for name, value in secrets.items(): | |
# Replace dashes with underscores to create a valid environment variable name | |
env_name = name.replace("-", "_") | |
env_var_name = f"{prefix}{env_name}".upper() | |
print(f"export {env_var_name}=\"{value}\"") | |
print("\n# Or copy and save to a file, then run 'source filename.sh'") | |
def format_for_eval(secrets: dict, prefix: str = "") -> None: | |
""" | |
Print export commands in a format suitable for eval in bash. | |
No extra text, just the export commands for direct use with eval. | |
Args: | |
secrets: Dictionary mapping secret names to values. | |
prefix: Optional prefix to add to environment variable names. | |
""" | |
for name, value in secrets.items(): | |
# Replace dashes with underscores to create a valid environment variable name | |
env_name = name.replace("-", "_") | |
env_var_name = f"{prefix}{env_name}".upper() | |
print(f"export {env_var_name}=\"{value}\";") | |
def main(): | |
"""Main function to run the script.""" | |
parser = setup_argument_parser() | |
args = parser.parse_args() | |
if args.debug: | |
logger.setLevel(logging.DEBUG) | |
try: | |
# Get secrets from Key Vault | |
client = get_secret_client(args.vault_name) | |
secret_names = None | |
if args.secrets: | |
secret_names = [s.strip() for s in args.secrets.split(",")] | |
secrets = get_secrets(client, secret_names) | |
if not secrets: | |
logger.warning("No secrets were retrieved.") | |
return | |
# Set environment variables in the Python process (won't affect parent shell) | |
set_environment_variables(secrets, args.prefix) | |
logger.info(f"Successfully set {len(secrets)} environment variables.") | |
# Output based on the mode selected | |
if args.eval: | |
# Format for use with bash eval | |
format_for_eval(secrets, args.prefix) | |
elif args.show: | |
# Show export commands | |
show_export_commands(secrets, args.prefix) | |
else: | |
# Default: show export commands | |
show_export_commands(secrets, args.prefix) | |
except (CredentialUnavailableError, ClientAuthenticationError) as e: | |
logger.error(f"Authentication error: {str(e)}") | |
sys.exit(1) | |
except HttpResponseError as e: | |
logger.error(f"Key Vault error: {str(e)}") | |
sys.exit(1) | |
except ServiceRequestError as e: | |
logger.error(f"Network error when connecting to Key Vault: {str(e)}") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Unexpected error: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment