Skip to content

Instantly share code, notes, and snippets.

@rollwagen
Last active May 30, 2022 17:04
Show Gist options
  • Save rollwagen/5e03ee60755a6173ea0bd15e17deab96 to your computer and use it in GitHub Desktop.
Save rollwagen/5e03ee60755a6173ea0bd15e17deab96 to your computer and use it in GitHub Desktop.
Python script to query lambda runtimes across AWS accounts
# -*- coding: utf-8 -*-
"""
Python cli tool to query lambda runtimes for all lambda functions in
all AWS accounts in an (SSO-) AWS Organization
Requirements / pre-requisites:
- `pip install boto3 mypy-boto3 mypy-boto3-sts mypy-boto3-sso mypy-boto3-lambda boto3-stubs[essential,sso,lambda,sts]`
- AWS: need to be (sso-) logged in i.e. `aws sso login`, and have access to
organizations accounts either via "ViewOnlyAccess" or "AdministratorAccess" role
Examples:
$ python lambdaruntimequery.py
$ python lambdaruntimequery.py --output json | jq '.[] | .function_name'
# count nr of lambda function with runtime python 3.6 that are in accounts that have 'prod' in account name
$ python lambdaruntimequery.py --output json | jq '.[] | select(.account_name | contains("prod")) | .function_name' | wc -l
Options:
-h, --help Show help message
-r <name>, --runtime <name> Lambda runtime to filter for e.g. python3.6
-o [plain|json], --output [plain|json] Output format: plain (default) or json
"""
import sys
if not sys.version_info >= (3, 9):
print("Python >= 3.9 required")
exit(1)
import argparse
import dataclasses
import functools
import json
import os
import boto3
from mypy_boto3_lambda import LambdaClient
from mypy_boto3_sso import SSOClient
from mypy_boto3_sts import STSClient
from dataclasses import dataclass
@dataclass
class Lambda:
account_name: str
account_id: str
function_name: str
runtime: str
@functools.lru_cache
def _sso_token() -> str:
aws_sso_cache_dir = os.path.expanduser("~/.aws/sso/cache")
try:
cache_file = [f for f in os.listdir(aws_sso_cache_dir) if f[0].isdigit()][0]
cache_filepath = f"{aws_sso_cache_dir}/{cache_file}"
with open(cache_filepath, "r") as token_cache_file:
token_json = json.loads(token_cache_file.read())
access_token = token_json["accessToken"]
return access_token
except Exception as exception:
print(f"Could not get access token from SSO cache. {exception=}")
return ""
@functools.lru_cache
def _args() -> dict[str, str]:
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--runtime', help='Lambda runtime to filter for e.g. python3.6', default="python3.6")
parser.add_argument('-o', '--output', help='Output format: plain (default) or json', default="plain")
args = vars(parser.parse_args())
return args
if __name__ == "__main__":
lambda_runtime_filter = _args()["runtime"]
lambdas: list[Lambda] = []
session = boto3.session.Session()
sso: SSOClient = session.client("sso") # noqa
sts: STSClient = session.client("sts") # noqa
lambdas_matching_filter = 0
for account in sso.list_accounts(accessToken=_sso_token(), maxResults=100)["accountList"]:
account_name = account["accountName"]
account_id = account["accountId"]
roles = sso.list_account_roles(accessToken=_sso_token(), accountId=account_id)
role = "ViewOnlyAccess" # use ViewOnly role per default; fallback: Admin role
if "ViewOnlyAccess" not in [role["roleName"] for role in roles["roleList"]]:
role = "AdministratorAccess"
credentials = sso.get_role_credentials(
roleName=role, accountId=account_id, accessToken=_sso_token()
)["roleCredentials"]
lambda_client: LambdaClient = boto3.client( # noqa
"lambda",
aws_access_key_id=credentials["accessKeyId"],
aws_secret_access_key=credentials["secretAccessKey"],
aws_session_token=credentials["sessionToken"],
)
for function in lambda_client.list_functions()["Functions"]:
function_name = function["FunctionName"]
runtime = function["Runtime"]
if runtime == lambda_runtime_filter:
lambdas.append(Lambda(account_name, account_id, function_name, runtime))
if _args()["output"] == "plain":
print("\n".join(
[f"account: {l_.account_name:{42}} id: {l_.account_id:{15}} function: {l_.function_name} [{l_.runtime}]"
for l_ in lambdas]))
print(f"Found {len(lambdas)} lambda functions")
elif _args()["output"] == "json":
print(json.dumps([dataclasses.asdict(lambda_) for lambda_ in lambdas]))
else:
print(f"Error: undefined output - {_args()['output']}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment