Created
January 27, 2020 12:50
-
-
Save katebee/74817b4271ad8ed5873163ec1b9537b4 to your computer and use it in GitHub Desktop.
Script to run AWS API calls across accounts to fetch stack outputs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import csv | |
import datetime | |
import json | |
import logging | |
import os | |
import os.path | |
import sys | |
from shutil import rmtree | |
from typing import List, Dict, Union | |
import boto3 | |
from boto3.session import Session | |
from botocore.client import BaseClient | |
from botocore.exceptions import ClientError, ProfileNotFound | |
__description__ = "Run AWS API calls across accounts to fetch stack outputs" | |
def make_directory(path): | |
try: | |
os.mkdir(path) | |
except OSError: | |
# Already exists | |
pass | |
def custom_serializer(x): | |
if isinstance(x, datetime.datetime): | |
return x.isoformat() | |
elif isinstance(x, bytes): | |
return x.decode() | |
raise TypeError("Unknown type") | |
def write_file(contents: Dict, path): | |
with open(f"{path}/stack_outputs.json", "w+") as f: | |
f.write(json.dumps(contents, indent=4, sort_keys=True, default=custom_serializer)) | |
def create_session(profile: str, region: str) -> Union[Session, Dict]: | |
try: | |
return boto3.Session(profile_name=profile, region_name=region) | |
except ProfileNotFound as exception: | |
print(f"ProfileNotFound: {exception}", flush=True) | |
return { | |
'exception': exception | |
} | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
return { | |
'exception': exception | |
} | |
def with_retries(max_retries: int, handler: BaseClient, method_to_call: str, parameters: Dict) -> Dict: | |
data = None | |
try: | |
for retry in range(max_retries): | |
if handler.can_paginate(method_to_call): | |
paginator = handler.get_paginator(method_to_call) | |
page_iterator = paginator.paginate(**parameters) | |
for response in page_iterator: | |
if not data: | |
data = response | |
else: | |
print(" ...paginating", flush=True) | |
for k in data: | |
if isinstance(data[k], list): | |
data[k].extend(response[k]) | |
else: | |
function = getattr(handler, method_to_call) | |
data = function(**parameters) | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
return { | |
'exception': exception | |
} | |
return data | |
def save_report(outputfile: str, path: str, handler: BaseClient, method_to_call: str, parameters): | |
make_directory("account-data") | |
make_directory(f"account-data/{path}") | |
if os.path.isfile(outputfile): | |
# Data already collected, so skip | |
print(f"Response already collected at {outputfile}", flush=True) | |
print(f"Making call for {outputfile}", flush=True) | |
output = with_retries(1, handler, method_to_call, parameters) | |
write_file(output) | |
def get_profiles(config_path: str) -> List[str]: | |
try: | |
config = json.load(open(config_path)) | |
return config.get('profiles', ['default']) | |
except IOError: | |
exit(f'ERROR: Unable to load config file "{config_path}"') | |
except ValueError as e: | |
exit(f'ERROR: Config file "{config_path}" could not be loaded ({e})') | |
def get_accounts(filename: str) -> List[str]: | |
with open(filename, newline='') as csvfile: | |
accounts = csv.reader(csvfile, delimiter=',') | |
return accounts | |
def filter_by_name(stacks: List[Dict], target_prefix: str) -> List[Dict]: | |
targets = filter(lambda stack: stack.get('StackName', '').startswith(target_prefix), stacks) | |
return list(targets) | |
def collect(profile: str, region: str, target_name: str) -> Dict: | |
session = create_session(profile, region) | |
cfn_client = session.client('cloudformation') | |
response = with_retries(1, cfn_client, 'describe_stacks', {}) | |
all_stacks = response.get('Stacks') | |
if all_stacks is not None: | |
target_stacks = filter_by_name(all_stacks, target_name) | |
for stack in target_stacks: | |
return { | |
'outputs': stack.get('Outputs', []) | |
} | |
if 'exception' in response: | |
return { | |
'exception': str(response['exception']) | |
} | |
def print_summary(summary: Dict): | |
print("--------------------------------------------------------------------") | |
failures = [] | |
for profile in summary: | |
failure = summary[profile].get('exception', None) | |
if failure is not None: | |
failures.append(failure) | |
print(f"Summary: {len(summary)} APIs called. {len(failures)} errors") | |
if len(failures) > 0: | |
print("Failures:") | |
for failure in failures: | |
print(failure) | |
# Ensure errors can be detected | |
exit(-1) | |
def run(arguments): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--config", | |
help="Config file name", | |
default="config.json", | |
type=str | |
) | |
parser.add_argument( | |
"--target", | |
help="prefix of the StackSet that should be targeted", | |
required=True, | |
type=str, | |
dest="target", | |
) | |
parser.add_argument( | |
"--region", | |
help="AWS region to inspect stacks in", | |
required=False, | |
type=str, | |
dest="region", | |
default="eu-west-1" | |
) | |
parser.add_argument( | |
"--profiles", | |
help="AWS profile names to iterate through", | |
required=False, | |
type=str, | |
dest="profiles" | |
) | |
parser.add_argument( | |
"--clean", | |
help="Remove any existing data before gathering", | |
action="store_true", | |
) | |
args = parser.parse_args(arguments) | |
target = args.target | |
region = args.region | |
if args.profiles: | |
profiles = args.profiles.split(",") | |
else: | |
# TODO: store and retrieve profile names in AWS | |
profiles = get_profiles(args.config) | |
logging.getLogger("botocore").setLevel(logging.WARN) | |
# cleaning report stuff | |
path = "account-data/stackset" | |
if args.clean and os.path.exists(path): | |
rmtree(path) | |
# generating report stuff | |
outcome = {} | |
for profile in profiles: | |
# TODO: handle botocore.exceptions.ProfileNotFound | |
response = collect(profile, region, target) | |
outcome[profile] = response | |
# saving report stuff | |
make_directory("account-data") | |
make_directory("account-data/stackset") | |
write_file(outcome, "account-data/stackset") | |
print_summary(outcome) | |
if __name__ == "__main__": | |
if len(sys.argv) <= 1: | |
print('usage: stack_outputs.py [-h] --target TARGET [--profiles PROFILES] [--clean]') | |
exit(-1) | |
run(sys.argv[1:]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment