Skip to content

Instantly share code, notes, and snippets.

@andreswebs
Forked from sgtoj/aws_sso.py
Last active February 16, 2021 01:00
Show Gist options
  • Save andreswebs/34927dbe6325998efafd8b5ed9d91ee3 to your computer and use it in GitHub Desktop.
Save andreswebs/34927dbe6325998efafd8b5ed9d91ee3 to your computer and use it in GitHub Desktop.
AWS SSO Credentials external process
#!/usr/bin/env python3
import json
import os
import sys
import hashlib
from configparser import ConfigParser
from datetime import datetime
from pathlib import Path
import boto3
AWS_CONFIG_PATH = os.getenv('AWS_CONFIG_PATH') or f'{Path.home()}/.aws/config'
AWS_SSO_CACHE_PATH = os.getenv('AWS_SSO_CACHE_PATH') or f'{Path.home()}/.aws/sso/cache'
# -------------------------------------------------------------------- main ---
def profile_credentials(profile_name):
profile = get_aws_profile(profile_name)
cache_login = get_sso_cached_login(profile)
credentials = get_sso_role_credentials(profile, cache_login)
external_process = {}
external_process['Version'] = 1
external_process['AccessKeyId'] = credentials['accessKeyId']
external_process['SecretAccessKey'] = credentials['secretAccessKey']
external_process['SessionToken'] = credentials['sessionToken']
external_process['Expiration'] = convert_to_isotime(credentials['expiration'])
json.dump(external_process, sys.stdout, indent=2)
# --------------------------------------------------------------------- fns ---
def get_aws_profile(profile_name):
config = read_config(AWS_CONFIG_PATH)
profile_opts = config.items(f'profile {profile_name}')
profile = dict(profile_opts)
return profile
def get_sso_cached_login(profile):
url_hash = hashlib.sha1(profile['sso_start_url'].encode('utf-8'))
file_path = AWS_SSO_CACHE_PATH + '/' + url_hash.hexdigest() + '.json'
data = load_json(file_path)
time_now = datetime.now()
if (
data is None or
data.get('startUrl') != profile['sso_start_url'] or
data.get('region') != profile['sso_region'] or
time_now > parse_timestamp(data.get('expiresAt', '1970-01-01T00:00:00Z'))
):
raise Exception('Current cached SSO login is expired or invalid')
return data
def get_sso_role_credentials(profile, login):
client = boto3.client('sso', region_name=profile['sso_region'])
response = client.get_role_credentials(
roleName=profile['sso_role_name'],
accountId=profile['sso_account_id'],
accessToken=login['accessToken'],
)
return response['roleCredentials']
# ------------------------------------------------------------------- utils ---
def load_json(path):
try:
with open(path) as context:
return json.load(context)
except ValueError:
pass # ignore invalid json
def parse_timestamp(value):
return datetime.strptime(value, '%Y-%m-%dT%H:%M:%SZ')
def convert_to_isotime(ms):
dt = datetime.utcfromtimestamp(ms/1000)
iso_format = dt.isoformat() + '+00:00'
return iso_format
def read_config(path):
config = ConfigParser()
config.read(path)
return config
# ---------------------------------------------------------------- handlers ---
def script_handler(args):
profile_name = 'default'
if len(args) == 2:
profile_name = args[1]
profile_credentials(profile_name)
if __name__ == '__main__':
script_handler(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment