Last active
June 14, 2024 14:25
-
-
Save filipeandre/ece6f2f62b9ecc681835c65085965af6 to your computer and use it in GitHub Desktop.
It gets the latest github tag from a repository, gets a stack from an s3 location and deploy it by using a temporary bucket. It support deployment in another aws accounts. The settings are saved and it supports profiles.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import re | |
import requests | |
import configparser | |
import tempfile | |
import uuid | |
import argparse | |
import subprocess | |
import inquirer | |
from inquirer import List, Text | |
# Define the path to the config file | |
CONFIG_FILE = 'config.ini' | |
# Color functions | |
def blue(text): | |
return f"\033[94m{text}\033[0m" | |
def green(text): | |
return f"\033[92m{text}\033[0m" | |
def yellow(text): | |
return f"\033[93m{text}\033[0m" | |
def red(text): | |
return f"\033[91m{text}\033[0m" | |
# Custom validation function for 'Y', 'N', 'YES', 'NO', 'y', 'n', 'yes', 'no' | |
def yes_no_validation(answers, current): | |
valid_answers = ['', 'y', 'n', 'yes', 'no'] | |
answer = current.lower() | |
if answer not in valid_answers: | |
raise inquirer.errors.ValidationError('', reason='Please enter y, n, yes, or no.') | |
return True | |
def get_input(prompt, default=None): | |
try: | |
user_input = input(prompt) | |
if user_input == '' and default is not None: | |
return default | |
return user_input | |
except EOFError: | |
return default | |
except KeyboardInterrupt: | |
print("\n\n\nCancelled by user\n") | |
exit(0) | |
def load_config(): | |
config = configparser.ConfigParser() | |
if os.path.exists(CONFIG_FILE): | |
config.read(CONFIG_FILE) | |
return config | |
def save_config(config): | |
with open(CONFIG_FILE, 'w') as configfile: | |
config.write(configfile) | |
def get_latest_github_tag(repo_name, github_token): | |
url = f'https://api.github.com/repos/{repo_name}/tags' | |
response = requests.get(url, headers={ | |
'X-GitHub-Api-Version': '2022-11-28', | |
'Authorization': f'Bearer {github_token}', | |
"Accept": "application/vnd.github.raw+json", | |
"Cache-Control": "no-cache" | |
}) | |
response.raise_for_status() | |
tags = response.json() | |
if tags: | |
return tags[0]['name'] | |
else: | |
raise ValueError('No tags found in the repository.') | |
def get_or_input_config(config, profile, key, prompt): | |
value = config[profile].get(key) | |
if value is None: | |
value = get_input(prompt) | |
config[profile][key] = value | |
return value | |
def print_config(config, profile): | |
print(f"\n{blue(f'Current configuration for profile {profile}:')}") | |
for key, value in config[profile].items(): | |
print(f"{green(key)}: {value}") | |
def edit_config(config, profile): | |
print(f"\n{blue(f'Editing configuration for profile {profile}:')}") | |
for key in config[profile]: | |
current_value = config[profile][key] | |
new_value = get_input(f"{yellow(f'Enter new value for {key} (current: {current_value}):')}") | |
if new_value: | |
config[profile][key] = new_value | |
def generate_script(config, profile): | |
aws_region = config[profile]['aws_region'] | |
bucket_name = config[profile]['bucket_name'] | |
github_repo = config[profile]['github_repo'] | |
github_token = config[profile]['github_token'] | |
bucket_path = config[profile]['bucket_path'] | |
stack_name = config[profile]['stack_name'] | |
iam_access_role = config[profile]['iam_access_role'] | |
account_id = config[profile]['account_id'] | |
# Retrieve latest GitHub tag | |
latest_tag = get_latest_github_tag(github_repo, github_token) | |
bucket_path = re.sub(r'\(\.\*\)', latest_tag, bucket_path) | |
# Generate a random bucket name | |
temp_bucket_name = f"{bucket_name}-temp-{uuid.uuid4().hex[:8]}" | |
# Define temp_file_path for downloading the template | |
temp_file_path = tempfile.NamedTemporaryFile(delete=False).name | |
assume_role_cleanup = "" | |
assume_role = "" | |
if account_id and iam_access_role: | |
assume_role = f""" | |
iam_role_arn="arn:aws:iam::{account_id}:role/{iam_access_role}" | |
echo {blue('Assuming role')} $iam_role_arn | |
export credentials=$(aws sts assume-role --role-arn $iam_role_arn --role-session-name {temp_bucket_name} --query Credentials) | |
export AWS_ACCESS_KEY_ID=$(echo $credentials | jq -r .AccessKeyId) | |
export AWS_SECRET_ACCESS_KEY=$(echo $credentials | jq -r .SecretAccessKey) | |
export AWS_SESSION_TOKEN=$(echo $credentials | jq -r .SessionToken) | |
""" | |
assume_role_cleanup = f""" | |
echo {blue('Unsetting AWS credentials')} | |
unset AWS_ACCESS_KEY_ID | |
unset AWS_SECRET_ACCESS_KEY | |
unset AWS_SESSION_TOKEN | |
""" | |
# Initialize empty list for commands with echo statements | |
script = f""" | |
#!/bin/bash | |
set -e | |
# Function to perform cleanup | |
cleanup() {{ | |
echo {blue('Cleaning up...')} | |
# Remove temporary file | |
if [ -f {temp_file_path} ]; then | |
echo {blue('Removing temp file')} {temp_file_path} | |
rm {temp_file_path} | |
fi | |
# Remove temporary S3 bucket | |
echo {blue('Deleting temporary bucket')} {temp_bucket_name} | |
aws s3 rb s3://{temp_bucket_name} --force --region {aws_region} | |
{assume_role_cleanup} | |
}} | |
command_exists() {{ | |
command -v "$1" >/dev/null 2>&1 | |
}} | |
if ! command_exists jq; then | |
if ! command_exists brew; then | |
echo {yellow("jq is not installed. Please install jq and try again.")} | |
exit 1 | |
fi | |
echo {blue("Installing jq using Homebrew...")} | |
brew install jq | |
fi | |
if ! command_exists aws; then | |
echo {yellow("aws cli is not installed. Please install aws cli and try again.")} | |
exit 1 | |
fi | |
# Trap EXIT signal to invoke cleanup function | |
trap cleanup EXIT | |
{assume_role} | |
echo {blue('Downloading template from S3')} | |
aws s3 cp s3://{bucket_name}/{bucket_path} {temp_file_path} --region {aws_region} | |
echo {blue('Creating temporary bucket')} {temp_bucket_name} | |
aws s3 mb s3://{temp_bucket_name} --region {aws_region} | |
echo {blue('Deploying CloudFormation stack')} {stack_name} {blue('to')} {aws_region} | |
aws cloudformation deploy --stack-name {stack_name} --template-file {temp_file_path} --s3-bucket {temp_bucket_name} --region {aws_region} --capabilities CAPABILITY_NAMED_IAM CAPABILITY_IAM | |
echo {green('Script execution complete')} | |
""" | |
return script | |
def execute_command(command): | |
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, text=True) | |
while True: | |
# returns None while subprocess is running | |
retcode = p.poll() | |
line = p.stdout.readline() | |
sys.stdout.write(line) | |
sys.stdout.flush() | |
if retcode is not None: | |
break | |
def select_profile(config, profile_arg=None): | |
profiles = config.sections() | |
profiles.append("Create new profile") | |
if profile_arg and profile_arg in profiles: | |
selected_profile = profile_arg | |
else: | |
questions = [ | |
List('profile', | |
message=f"{blue('Select a profile to use:')}", | |
choices=profiles, | |
), | |
] | |
answers = inquirer.prompt(questions) | |
if not answers: | |
exit(0) | |
selected_profile = answers['profile'] | |
if selected_profile == "Create new profile": | |
selected_profile = get_input(f"{yellow('Enter the name for the new profile: ')}") | |
if not config.has_section(selected_profile): | |
config.add_section(selected_profile) | |
return selected_profile | |
def configure_profile(config, profile, args): | |
while True: | |
get_or_input_config(config, profile, 'aws_region', f"{yellow('Enter AWS region: ')}") | |
get_or_input_config(config, profile, 'bucket_name', f"{yellow('Enter S3 bucket name: ')}") | |
get_or_input_config(config, profile, 'github_repo', f"{yellow('Enter GitHub repository name (e.g., user/repo): ')}") | |
get_or_input_config(config, profile, 'github_token', f"{yellow('Enter GitHub token: ')}") | |
get_or_input_config(config, profile, 'bucket_path', f"{yellow('Enter bucket path (with a placeholder for the tag, e.g., templates/my-template-(.*).yaml): ')}") | |
get_or_input_config(config, profile, 'stack_name', f"{yellow('Enter CloudFormation stack name: ')}") | |
get_or_input_config(config, profile, 'iam_access_role', f"{yellow('Enter IAM role name (e.g., MyRole): ')}") | |
get_or_input_config(config, profile, 'account_id', f"{yellow('Enter AWS account ID: ')}") | |
if args.yes: | |
confirm = True | |
else: | |
print_config(config, profile) | |
questions = [ | |
Text('confirm', message=f"{blue('Is the above configuration correct? (Y/n)')}", default='', validate=yes_no_validation) | |
] | |
answers = inquirer.prompt(questions) | |
if not answers: | |
exit(0) | |
confirm = answers['confirm'] in ["yes", "y", ""] | |
if confirm: | |
save_config(config) | |
break | |
else: | |
edit_config(config, profile) | |
def handle_execution(script, execute_flag): | |
if not execute_flag: | |
print(f"\n{blue('Commands:')}") | |
print(script) | |
questions = [ | |
Text('execute', message=f"{blue('Do you want to execute these commands now? (Y/n)')}", default='', validate=yes_no_validation), | |
] | |
answers = inquirer.prompt(questions) | |
if not answers: | |
exit(0) | |
if not answers['execute'] in ["yes", "y", ""]: | |
return | |
# Create a temporary bash script to run all commands | |
script_file = tempfile.NamedTemporaryFile(mode='w', delete=False) | |
try: | |
script_file.write(script) | |
script_file.close() | |
# Execute the bash script | |
os.chmod(f"{script_file.name}", 0o755) | |
execute_command(f"{script_file.name}") | |
finally: | |
os.remove(script_file.name) | |
def main(): | |
parser = argparse.ArgumentParser(description='Run or print deployment commands based on the configuration.') | |
parser.add_argument('--exec', action='store_true', help='Execute the deployment commands instead of printing them') | |
parser.add_argument('--yes', action='store_true', help='Automatically confirm the configuration without prompting') | |
parser.add_argument('--profile', help='Specify the profile name to use') | |
args = parser.parse_args() | |
config = load_config() | |
selected_profile = select_profile(config, args.profile) | |
configure_profile(config, selected_profile, args) | |
script = generate_script(config, selected_profile) | |
handle_execution(script, args.exec) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment