Last active
May 30, 2022 17:04
-
-
Save rollwagen/5e03ee60755a6173ea0bd15e17deab96 to your computer and use it in GitHub Desktop.
Python script to query lambda runtimes across AWS accounts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Python cli tool to query lambda runtimes for all lambda functions in | |
all AWS accounts in an (SSO-) AWS Organization | |
Requirements / pre-requisites: | |
- `pip install boto3 mypy-boto3 mypy-boto3-sts mypy-boto3-sso mypy-boto3-lambda boto3-stubs[essential,sso,lambda,sts]` | |
- AWS: need to be (sso-) logged in i.e. `aws sso login`, and have access to | |
organizations accounts either via "ViewOnlyAccess" or "AdministratorAccess" role | |
Examples: | |
$ python lambdaruntimequery.py | |
$ python lambdaruntimequery.py --output json | jq '.[] | .function_name' | |
# count nr of lambda function with runtime python 3.6 that are in accounts that have 'prod' in account name | |
$ python lambdaruntimequery.py --output json | jq '.[] | select(.account_name | contains("prod")) | .function_name' | wc -l | |
Options: | |
-h, --help Show help message | |
-r <name>, --runtime <name> Lambda runtime to filter for e.g. python3.6 | |
-o [plain|json], --output [plain|json] Output format: plain (default) or json | |
""" | |
import sys | |
if not sys.version_info >= (3, 9): | |
print("Python >= 3.9 required") | |
exit(1) | |
import argparse | |
import dataclasses | |
import functools | |
import json | |
import os | |
import boto3 | |
from mypy_boto3_lambda import LambdaClient | |
from mypy_boto3_sso import SSOClient | |
from mypy_boto3_sts import STSClient | |
from dataclasses import dataclass | |
@dataclass | |
class Lambda: | |
account_name: str | |
account_id: str | |
function_name: str | |
runtime: str | |
@functools.lru_cache | |
def _sso_token() -> str: | |
aws_sso_cache_dir = os.path.expanduser("~/.aws/sso/cache") | |
try: | |
cache_file = [f for f in os.listdir(aws_sso_cache_dir) if f[0].isdigit()][0] | |
cache_filepath = f"{aws_sso_cache_dir}/{cache_file}" | |
with open(cache_filepath, "r") as token_cache_file: | |
token_json = json.loads(token_cache_file.read()) | |
access_token = token_json["accessToken"] | |
return access_token | |
except Exception as exception: | |
print(f"Could not get access token from SSO cache. {exception=}") | |
return "" | |
@functools.lru_cache | |
def _args() -> dict[str, str]: | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-r', '--runtime', help='Lambda runtime to filter for e.g. python3.6', default="python3.6") | |
parser.add_argument('-o', '--output', help='Output format: plain (default) or json', default="plain") | |
args = vars(parser.parse_args()) | |
return args | |
if __name__ == "__main__": | |
lambda_runtime_filter = _args()["runtime"] | |
lambdas: list[Lambda] = [] | |
session = boto3.session.Session() | |
sso: SSOClient = session.client("sso") # noqa | |
sts: STSClient = session.client("sts") # noqa | |
lambdas_matching_filter = 0 | |
for account in sso.list_accounts(accessToken=_sso_token(), maxResults=100)["accountList"]: | |
account_name = account["accountName"] | |
account_id = account["accountId"] | |
roles = sso.list_account_roles(accessToken=_sso_token(), accountId=account_id) | |
role = "ViewOnlyAccess" # use ViewOnly role per default; fallback: Admin role | |
if "ViewOnlyAccess" not in [role["roleName"] for role in roles["roleList"]]: | |
role = "AdministratorAccess" | |
credentials = sso.get_role_credentials( | |
roleName=role, accountId=account_id, accessToken=_sso_token() | |
)["roleCredentials"] | |
lambda_client: LambdaClient = boto3.client( # noqa | |
"lambda", | |
aws_access_key_id=credentials["accessKeyId"], | |
aws_secret_access_key=credentials["secretAccessKey"], | |
aws_session_token=credentials["sessionToken"], | |
) | |
for function in lambda_client.list_functions()["Functions"]: | |
function_name = function["FunctionName"] | |
runtime = function["Runtime"] | |
if runtime == lambda_runtime_filter: | |
lambdas.append(Lambda(account_name, account_id, function_name, runtime)) | |
if _args()["output"] == "plain": | |
print("\n".join( | |
[f"account: {l_.account_name:{42}} id: {l_.account_id:{15}} function: {l_.function_name} [{l_.runtime}]" | |
for l_ in lambdas])) | |
print(f"Found {len(lambdas)} lambda functions") | |
elif _args()["output"] == "json": | |
print(json.dumps([dataclasses.asdict(lambda_) for lambda_ in lambdas])) | |
else: | |
print(f"Error: undefined output - {_args()['output']}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment