Skip to content

Instantly share code, notes, and snippets.

@jlinoff
Created February 2, 2022 00:31
Show Gist options
  • Save jlinoff/464423f9e8a3922f5458f4b256e6ca92 to your computer and use it in GitHub Desktop.
Save jlinoff/464423f9e8a3922f5458f4b256e6ca92 to your computer and use it in GitHub Desktop.
Report all of the AWS service or resource types present in your accounts and regions from the command line (CLI).
#!/usr/bin/env python3
'''
Report all of the service or resource types present in accounts and
regions.
It generates six different reports:
1. diff the services between two accounts.
2. diff the resources between two accounts.
3. summary of services present in one or more accounts.
4. summary of resources present in one or more accounts.
5. complete ARN report of services present in one or more accounts.
6. complete ARN of resources present in one or more accounts.
Each account is specified by its profile name and region separated by
a colon. You can specify as many accounts as you like. See help (`-h`)
for more information.
The tool supports filtering so you can limit the scope of the
reports.
You can specify accept criteria using `-a`, like this to see
only the lambdas: `-a 'arn:aws:lambda:'`.
In addition, you can specify reject criteria using `-r` to see
everything _except_ the lambdas: `-r 'arn:aws:lambda:'`. See the
online help (`-h`) for more information.
This tools uses the "resourcegroupstaggingapi" CLI service.
Although it works well for accounts and regions but it is not clear
that this service works for global services or resources.
It assumes that the user is logged into the account using a profile.
To use this tool in pipenv do this:
$ pipenv install --python python3 types-boto3 boto3 pylint mypy
$ pipenv run ./aws-inventory.py -h
$ pipenv run pylint ./aws-inventory.py
$ pipenv run mypy ./aws-inventory.py
You will need to have profiles and be logged in to use this tool.
'''
import argparse
import datetime
import inspect
import os
import re
import sys
from typing import Any, Dict, Tuple
import boto3
__version__ = '1.0.0'
AWS_PROFILE = os.getenv('AWS_PROFILE', '')
AWS_REGION = os.getenv('AWS_REGION', '')
RPD = int(os.getenv('RPD', '100')) # records per dot
DPL = int(os.getenv('DPL', '100')) # dots per line
def info(msg: str, level: int = 1) -> None:
'''
Output an info message.
'''
lnum = inspect.stack()[level].lineno
print(f'SERVER:{lnum}: {msg}')
def warn(msg: str, level: int = 1) -> None:
'''
Output a warning message.
'''
lnum = inspect.stack()[level].lineno
print(f'\033[31mWARNING:{lnum}: {msg}\033[0m', file=sys.stderr)
def err(msg: str, level: int = 1, xcode: int = 1, abort: bool = True) -> None:
'''
Output an error message and exit.
'''
lnum = inspect.stack()[level].lineno
print(f'\033[31mERROR:{lnum}: {msg}\033[0m', file=sys.stderr)
if abort:
sys.exit(xcode)
def getopts() -> argparse.Namespace:
'''
Command line options.
@returns The command line options.
'''
def gettext(string):
'''
Convert to upper case to make things consistent.
'''
lookup = {
'usage: ': 'USAGE:',
'positional arguments': 'POSITIONAL ARGUMENTS',
'optional arguments': 'OPTIONAL ARGUMENTS',
'show this help message and exit': 'Show this help message and exit.\n ',
}
return lookup.get(string, string)
argparse._ = gettext # type: ignore # to capitalize help headers
base = os.path.basename(sys.argv[0])
usage = '\n {base} [OPTIONS] ACCOUNTS'
desc = 'DESCRIPTION:{0}'.format('\n '.join(__doc__.split('\n')))
epilog = fr'''
EXAMPLES:
# Example 1: help
$ {base} -h
# Example 2: Summary of the first 1,000 services.
$ {base} -Ss -m 1000 -v int:eu-west-1
# Example 3: Summary of the first 1,000 resources.
$ {base} -s -m 1000 -v int:eu-west-1
# Example 4: Summary of services for two accounts.
$ {base} -Ss -v work:eu-west-1 test:eu-west-1
# Example 5: Summary of services for three accounts.
$ {base} -Ssv work:eu-west-1 test:eu-west-1 green:eu-west-1
# Example 6: Summary of resources for three accounts.
$ {base} -sv work:eu-west-1 test:eu-west-1 green:eu-west-1
# Example 7: Summary of resource differences between two accounts.
$ {base} -sdv blue:eu-west-1 green:eu-west-1
LICENSE
MIT Open Source
VERSION
{base} {__version__}
'''
afc = argparse.RawTextHelpFormatter
parser = argparse.ArgumentParser(formatter_class=afc,
description=desc[:-2],
usage=usage,
epilog=epilog.rstrip() + '\n ')
parser.add_argument('-a', '--accept',
action='store',
default='',
help='''\
Accept ARN by pattern.
This is a regular expression.
The default is to accept all ARNs.
ex: -f '^arn:aws:lambda|^arn:aws:session'
''')
parser.add_argument('-d', '--diff',
action='store_true',
default='',
help='''\
REport the differences between the first
profile/region and all of the others.
''')
parser.add_argument('-m', '--max',
action='store',
type=int,
default=0,
help='''\
The maximum number of resources to collect.
This is mainly for debugging.
''')
parser.add_argument('-l', '--long',
action='store_true',
help='''\
The long report. Lists all of the ARNs.
For a complete report with the summary
and ARNs, specify '-ls'.
''')
parser.add_argument('-n', '--name-max',
action='store',
type=int,
default=0,
help='''\
The maximum number of characters
in a resource or service name.
This is mainly for debugging.
''')
parser.add_argument('-R', '--reject',
action='store',
default='',
help='''\
Reject ARN by pattern.
This is a regular expression.
The default is to reject no ARNs.
ex: -f '^arn:aws:lambda|^arn:aws:session'
''')
parser.add_argument('-s', '--summary',
action='store_true',
help='''\
Summary only.
This is the default if neither -l or -s is specified.
Do not list all of the ARNs individually.
''')
parser.add_argument('-S', '--service-only',
action='store_true',
help='''\
Serices only summary only.
Do not list all of the resource-types.
''')
parser.add_argument('-v', '--verbose',
action='count',
default=0,
help='''\
Increase the level of verbosity.
''')
parser.add_argument('-V', '--version',
action='version',
version='%(prog)s version {__version__}',
help='''\
Show program's version number and exit.
''')
parser.add_argument('ACCOUNTS',
nargs='*',
help='''\
The AWS account profile name and region separated
by a colon.
''')
opts = parser.parse_args()
if not opts.summary and not opts.long:
opts.summary = True
return opts
def parse_accounts(opts: argparse.Namespace) -> list:
'''populate account the profile and regions
Args:
opts: Command line options.
Returns:
accounts: The augmented list of accounts specified on the command line.
'''
accounts = []
for rec in opts.ACCOUNTS:
if rec.count(':') > 1:
err(f'invalid profile argument, too many colons: {rec}')
profile, region = rec.split(':')
if not profile:
if AWS_PROFILE:
profile = AWS_PROFILE
else:
err(f'missing profile in "{rec}" and AWS_PROFILE is not defined')
if not region:
if AWS_REGION:
region = AWS_REGION
else:
err(f'missing region in "{rec}" and AWS_REGION is not defined')
session = boto3.Session(profile_name=profile)
aid = session.client('sts', region_name=region).get_caller_identity().get('Account')
account = {'profile': profile,
'region': region,
'aid': aid}
accounts.append(account)
if not accounts:
accounts.append({'profile': AWS_PROFILE, 'region': AWS_REGION})
return accounts
def collect( # pylint: disable=too-many-branches,too-many-locals,too-many-branches,too-many-statements
opts: argparse.Namespace,
profile: str,
region: str
) -> Tuple[dict, dict, int]:
'''collect the data
Args:
opts: Command line options.
profile: The account profile name.
region: The account region.
Returns:
arns: A dictionary of the collected ARN records.
by_types: The ARNs organized by service name or resource type.
total: The total number of records processed before filtering.
'''
by_types : Dict[str, Any] = {}
arns : Dict[str, Any] = {}
tokens : Dict[str, int] = {}
accept = re.compile(opts.accept) if opts.accept else re.compile('.')
reject = re.compile(opts.reject) if opts.reject else re.compile('.')
session = boto3.Session(profile_name=profile)
client = session.client('resourcegroupstaggingapi', region_name=region)
resources = client.get_resources()
total = 0
if opts.verbose:
info(f'collecting resource data from "{profile}" in "{region}"')
while True: # pylint: disable=too-many-nested-blocks
token = resources['PaginationToken'] if 'PaginationToken' in resources else ''
if token in tokens:
break
if not token:
break
tokens[token] = 1
for entry in resources['ResourceTagMappingList']:
arn = entry['ResourceARN']
flds = arn.split(':',6)
total += 1
assert arn not in arns
# See if we hit the max (--max)
if 0 < opts.max < len(arns):
if opts.verbose == 1:
print(f' {total:>8} {len(arns):>8}\n', flush=True)
return arns, by_types, total
# Print out the verbose messages.
# note that len(arns) != total when accept/reject filtering is used
if opts.verbose:
if RPD and (total % RPD) == 0:
print('.', flush=True, end='')
if DPL and (total % (RPD*DPL)) == 0:
print(f' {total:>8} {len(arns):>8}\n', flush=True)
# Figure out the service and resource-type
# arn:aws:s3:::int-gateway-prod-stream20190730195153550400000003
# arn:aws:lambda:eu-west-1:143384653512:function:walid-workflow-rule-api
service = flds[2]
if len(flds) > 6:
# (7) arn:partition:service:region:account-id:resource-type:resource-id
resource_type = flds[5].split('/')[0]
if not resource_type:
# arn:aws:apigateway:eu-west-1::/usageplans/zghz7h
resource_type = service
elif len(flds) > 5:
# (6) arn:partition:service:region:account-id:resource-id
# (6) arn:partition:service:region:account-id:resource-type/resource-id
resource_type = ''
if '/' in flds[5]:
resource_type = flds[5].split('/')[0]
if not resource_type:
# arn:aws:apigateway:eu-west-1::/usageplans/zghz7h
resource_type = service
else:
resource_type = service
if resource_type.count('/') > 0:
err(f'internal resource parsing error!\n{arn}\n{resource_type}:{flds}')
# Get the resource type from the ARN.
if opts.service_only:
rtype = f'{service}'
else:
rtype = f'{service}:{resource_type}'
if opts.name_max:
rtype = rtype[opts.name_max:]
# Filter.
if not accept.search(arn):
continue
if opts.reject and reject.search(arn):
continue
if rtype not in by_types:
by_types[rtype] = []
# Update the collections.
by_types[rtype].append(entry)
arns[arn] = entry
resources = client.get_resources(PaginationToken=token)
if opts.verbose == 1:
total = len(arns)
print(f' {total:>8} {len(arns)}\n', flush=True)
return arns, by_types, total
def collect_from_accounts(opts: argparse.Namespace, accounts: list) -> Tuple[list, dict]:
'''collect data for all accounts.
Args:
opts: Command line options.
accounts: The accounts to process.
'''
accounts = parse_accounts(opts)
recs = []
cols = {}
for account in accounts:
profile = account['profile']
region = account['region']
aid = account['aid']
arns, by_types, total = collect(opts, profile, region)
cwidth = max([len(profile), len(region), len(aid), 8])
rec = {
'profile': profile,
'region': region,
'aid': aid,
'cwidth': cwidth,
'arns': arns,
'by_types': by_types,
'total': total
}
recs.append(rec)
for key in sorted(by_types, key=str.lower):
if key not in cols:
cols[key] = 0
cols[key] += 1
if opts.verbose:
info(f'collected {len(arns)} resources for "{profile}" in "{region}" ({aid})')
return recs, cols
def summary( #pylint: disable=too-many-branches,too-many-locals,too-many-statements
opts: argparse.Namespace,
recs: list,
cols: dict,
today: str
):
'''Summary report.
Args:
opts: Command line options.
recs: Collected, transformed records.
cols: The column definitions.
today: The report date.
'''
if opts.verbose:
info('summary report')
klen = max(9, max([len(x) for x in cols])) if len(cols) else 9
print('')
if opts.service_only:
print(f'Account Service Summary Report {today}')
else:
print(f'Account Resource Summary Report {today}')
if opts.accept:
print(f'Accept Pattern: "{opts.accept}"')
if opts.reject:
print(f'Reject Pattern: "{opts.accept}"')
print('')
# Header line 1:. account id
print(f'{"":<{klen}}', end='')
for i, rec in enumerate(recs):
aid = rec['aid']
cwidth = rec['cwidth']
if i and opts.diff:
print(f' {"":<8}', end='')
print(f' {aid:<{cwidth}}', end='')
print('')
# Header line 2: profile
print(f'{"":<{klen}}', end='')
for i, rec in enumerate(recs):
profile = rec['profile']
cwidth = rec['cwidth']
if i and opts.diff:
print(f' {"":<8}', end='')
print(f' {profile:<{cwidth}}', end='')
print('')
# Header line 3: region
if opts.service_only:
print(f'{"Service":<{klen}}', end='')
else:
print(f'{"Resource":<{klen}}', end='')
for i, rec in enumerate(recs):
region = rec['region']
cwidth = rec['cwidth']
if i and opts.diff:
print(f' {"diff":<8}', end='')
print(f' {region:<{cwidth}}', end='')
print('')
# Header line 4: separators
print('='*klen, end='')
for i, rec in enumerate(recs):
cwidth = rec['cwidth']
if i and opts.diff:
print(' ' + '='*8, end='')
print(' ' + '='*cwidth, end='')
print('')
# Count for each resource.
for key in sorted(cols, key=str.lower):
print(f'{key:<{klen}}', end='')
for i, rec in enumerate(recs):
cwidth = rec['cwidth']
by_types = rec['by_types']
try:
count = len(by_types[key])
except KeyError:
count = 0
if i == 0 or not opts.diff:
print(f' {count:>{cwidth}}', end='', flush=True)
else:
try:
by_types0 = recs[0]['by_types']
count0 = len(by_types0[key])
except KeyError:
count0 = 0
diff = count - count0
print(f' {diff:>8} {count:>{cwidth}}', end='', flush=True)
print('')
# Totals row
print(f'{"TOTALS":<{klen}}', end='')
for i, rec in enumerate(recs):
cwidth = rec['cwidth']
count = sum([len(b) for b in rec['by_types'].values()])
if i and opts.diff:
print(f' {"":<8}', end='')
print(f' {count:>{cwidth}}', end='', flush=True)
print('')
print('')
def report_arns(opts: argparse.Namespace, recs: list, today: str):
'''Report all of the arns
Args:
opts: Command line options.
recs: Collected, transformed records.
today: The report date.
'''
if opts.verbose:
info('arn report')
print('')
if opts.service_only:
print(f'Account Service ARN Report {today}\n')
else:
print(f'Account Resource ARN Report {today}\n')
collected = {}
for rec in recs:
arns = rec['arns']
for arn in sorted(arns, key=str.lower):
collected[arn] = True
for arn in sorted(collected, key=str.lower):
print(f'{arn}')
def main():
'''
main
'''
today = datetime.datetime.now().isoformat(timespec="seconds")
opts = getopts()
accounts = parse_accounts(opts)
recs, cols = collect_from_accounts(opts, accounts)
if opts.summary:
summary(opts, recs, cols, today)
if opts.long:
report_arns(opts, recs, today)
if opts.verbose:
info('done')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment