Skip to content

Instantly share code, notes, and snippets.

@rdkls
Created February 21, 2020 04:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rdkls/b1ef45b67d900b4a319229dd815f36c6 to your computer and use it in GitHub Desktop.
Save rdkls/b1ef45b67d900b4a319229dd815f36c6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import boto3
import botocore
import sys
import csv
from itertools import repeat as r
import concurrent.futures
REGION_NAME_DEFAULT = 'ap-southeast-2'
TPL_ROLE_TO_ASSUME = 'arn:aws:iam::{account_id}:role/intelematics/Administrator'
ASSUMED_SESSION_NAME = 'cloud_platforms_team_iam_check'
TPL_CONSOLE_LINK = 'https://{region_name}.console.aws.amazon.com/rds/home?region={region_name}#database:id={instance_id};is-cluster=false'
parser = argparse.ArgumentParser()
parser.add_argument('--format', default='text')
args = parser.parse_args()
def get_region_names():
# If dynamic is too long to iterate through
# return ["ap-southeast-2", "us-west-2", "eu-west-1", "us-east-1"]
ec2 = boto3.client('ec2', REGION_NAME_DEFAULT)
regions = ec2.describe_regions()
region_names = list(map(lambda x: x['RegionName'], regions['Regions']))
return region_names
def assume_role(from_session, to_arn):
sts = from_session.client("sts")
res = sts.assume_role(RoleArn=to_arn, RoleSessionName='tmp')
assumed_session = from_session.session.Session(
aws_access_key_id=res["Credentials"]["AccessKeyId"],
aws_secret_access_key=res["Credentials"]["SecretAccessKey"],
aws_session_token=res["Credentials"]["SessionToken"],
)
return assumed_session
def get_all_data_one_account(account, client_name, region_names, method_name, field_name, filters=None) -> list:
all_data_all_regions = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
all_data_all_regions = list(executor.map(
get_all_data_one_account_one_region,
r(account['Id']),
r(client_name),
region_names,
r(method_name),
r(field_name),
r(filters),
))
all_data_all_regions = sum(all_data_all_regions, [])
for a in all_data_all_regions:
a['AccountId'] = account['Id']
a['AccountName'] = account['Name']
return all_data_all_regions
def get_all_data_one_account_one_region(account_id, client_name, region_name, method_name, field_name, filters=None) -> list:
to_arn = TPL_ROLE_TO_ASSUME.format(account_id=account_id)
session = assume_role(boto3, to_arn)
client = session.client(client_name, region_name=region_name)
all_data_one_region = []
try:
for page in client.get_paginator(method_name).paginate(**{'Filters': filters}):
items = page[field_name]
for item in items:
item['RegionName'] = region_name
all_data_one_region += items
except botocore.exceptions.ClientError:
# Not allowed (in this region by permissions boundary) - skip
pass
return all_data_one_region
def get_accounts():
organizations = boto3.client('organizations')
accounts = []
for page in organizations.get_paginator('list_accounts').paginate():
accounts += page['Accounts']
accounts = filter(lambda x: 'ACTIVE' == x['Status'], accounts)
return accounts
if '__main__' == __name__:
csvwriter = csv.writer(sys.stdout)
if 'csv' == args.format:
csvwriter.writerow([
'account_id',
'account_name',
'region',
'instance_id',
'db_name',
'CACertificateIdentifier',
'console_link',
])
accounts = get_accounts()
filters = []
region_names = get_region_names()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
all_data = list(executor.map(
get_all_data_one_account,
accounts,
r('rds'),
r(region_names),
r('describe_db_instances'),
r('DBInstances'),
r(filters),
))
all_data = sum(all_data, [])
# Only get active instances - not easy (possible?) to do with --filter
all_data = filter(lambda x: x['DBInstanceStatus'] == 'available', all_data)
# Dedupe
deduped_data = []
for data in all_data:
if data['DBInstanceIdentifier'] not in [x['DBInstanceIdentifier'] for x in deduped_data]:
deduped_data.append(data)
# Add instances using SGs
for data in deduped_data:
csvwriter.writerow([
data['AccountId'],
data['AccountName'],
data['RegionName'],
data['DBInstanceIdentifier'],
data.get('DBName', '(no name)'),
data['CACertificateIdentifier'],
TPL_CONSOLE_LINK.format(region_name=data['RegionName'], instance_id=data['DBInstanceIdentifier']),
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment