Skip to content

Instantly share code, notes, and snippets.

@jameswhite
Forked from JoeyG1973/aws_saml.py
Created October 4, 2019 15:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jameswhite/de1bbb27a8881866c4cabbc13856369c to your computer and use it in GitHub Desktop.
Save jameswhite/de1bbb27a8881866c4cabbc13856369c to your computer and use it in GitHub Desktop.
aws saml login with session that auto refreshes.
# Took this:
# https://s3.amazonaws.com/awsiammedia/public/sample/SAMLAPICLIADFS/samlapi_formauth_adfs3.py
# converted to boto3 and smooshed it together with this:
# https://gist.github.com/kapilt/ac8e222081f63ba64e93
# which gave birth too this:
import sys
import botocore
import boto3
import requests
import base64
import logging
import xml.etree.ElementTree as ET
import re
from bs4 import BeautifulSoup
from urlparse import urlparse
# JJG
# these get rid of insecure warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from botocore.vendored.requests.packages.urllib3.exceptions import InsecureRequestWarning
botocore.vendored.requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#import urllib3
#urllib3.disable_warnings()
# JJG
# JJG
# these get rid of the BeautifulSoup warnings
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
# JJG
##########################################################################
# Variables
# region: The default AWS region that this script will connect
# to for all API calls
defaultRegion = 'us-east-1'
# SSL certificate verification: Whether or not strict certificate
# verification is done, False should only be used for dev/test
defaultSslVerification = False
# idpentryurl: The initial url that starts the authentication process.
defaultIdpEntryUrl = 'https://ssov3.yourcorp.com/adfs/ls/IdpInitiatedSignOn?loginToRp=urn:amazon:webservices'
# Uncomment to enable low level debugging
#logging.basicConfig(level=logging.DEBUG)
##########################################################################
def aws_session(username, password, rolearnselection, idpentryurl=defaultIdpEntryUrl, region=defaultRegion, sslverification=defaultSslVerification):
def refresh():
# Initiate session handler
session = requests.Session()
# Programmatically get the SAML assertion
# Opens the initial IdP url and follows all of the HTTP302 redirects, and
# gets the resulting login page
formresponse = session.get(idpentryurl, verify=sslverification)
# Capture the idpauthformsubmiturl, which is the final url after all the 302s
idpauthformsubmiturl = formresponse.url
# Parse the response and extract all the necessary values
# in order to build a dictionary of all of the form values the IdP expects
formsoup = BeautifulSoup(formresponse.text.decode('utf8'))
payload = {}
for inputtag in formsoup.find_all(re.compile('(INPUT|input)')):
name = inputtag.get('name','')
value = inputtag.get('value','')
if "user" in name.lower():
#Make an educated guess that this is the right field for the username
payload[name] = username
elif "email" in name.lower():
#Some IdPs also label the username field as 'email'
payload[name] = username
elif "pass" in name.lower():
#Make an educated guess that this is the right field for the password
payload[name] = password
else:
#Simply populate the parameter with the existing value (picks up hidden fields in the login form)
payload[name] = value
# Debug the parameter payload if needed
# Use with caution since this will print sensitive output to the screen
#print payload
# Some IdPs don't explicitly set a form action, but if one is set we should
# build the idpauthformsubmiturl by combining the scheme and hostname
# from the entry url with the form action target
# If the action tag doesn't exist, we just stick with the
# idpauthformsubmiturl above
for inputtag in formsoup.find_all(re.compile('(FORM|form)')):
action = inputtag.get('action')
loginid = inputtag.get('id')
if (action and loginid == "loginForm"):
parsedurl = urlparse(idpentryurl)
idpauthformsubmiturl = parsedurl.scheme + "://" + parsedurl.netloc + action
# Performs the submission of the IdP login form with the above post data
response = session.post(
idpauthformsubmiturl, data=payload, verify=sslverification)
# Debug the response if needed
#print (response.text)
# # Overwrite and delete the credential variables, just for safety
# username = '##############################################'
# password = '##############################################'
# del username
# del password
# Decode the response and extract the SAML assertion
soup = BeautifulSoup(response.text.decode('utf8'))
assertion = ''
# Look for the SAMLResponse attribute of the input tag (determined by
# analyzing the debug print lines above)
for inputtag in soup.find_all('input'):
if(inputtag.get('name') == 'SAMLResponse'):
#print(inputtag.get('value'))
assertion = inputtag.get('value')
# Better error handling is required for production use.
if (assertion == ''):
#TODO: Insert valid error checking/handling
print 'Response did not contain a valid SAML assertion'
sys.exit(0)
# Debug only
# print(base64.b64decode(assertion))
# Parse the returned assertion and extract the authorized roles
awsroles = []
root = ET.fromstring(base64.b64decode(assertion))
for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if (saml2attribute.get('Name') == 'https://aws.amazon.com/SAML/Attributes/Role'):
for saml2attributevalue in saml2attribute.iter('{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'):
awsroles.append(saml2attributevalue.text)
# Note the format of the attribute value should be role_arn,principal_arn
# but lots of blogs list it as principal_arn,role_arn so let's reverse
# them if needed
for awsrole in awsroles:
chunks = awsrole.split(',')
if'saml-provider' in chunks[0]:
newawsrole = chunks[1] + ',' + chunks[0]
index = awsroles.index(awsrole)
awsroles.insert(index, newawsrole)
awsroles.remove(awsrole)
found = False
for index, awsrole in enumerate(awsroles):
if rolearnselection in awsrole:
roleindex = index
found = True
if found is False:
raise Exception
role_arn = awsroles[int(roleindex)].split(',')[0]
principal_arn = awsroles[int(roleindex)].split(',')[1]
# Use the assertion to get an AWS STS token using Assume Role with SAML
# conn = boto.sts.connect_to_region(region)
# token = conn.assume_role_with_saml(role_arn, principal_arn, assertion)
### converting this so that it returns a refreshable session
sessionrefresh = boto3.Session()
credentials = sessionrefresh.client('sts', verify=defaultSslVerification).assume_role_with_saml(
RoleArn=role_arn,
PrincipalArn=principal_arn,
SAMLAssertion=assertion
)['Credentials']
return dict(
access_key=credentials['AccessKeyId'],
secret_key=credentials['SecretAccessKey'],
token=credentials['SessionToken'],
# Silly that we basically stringify so it can be parsed again
expiry_time=credentials['Expiration'].isoformat()
)
session_credentials = botocore.credentials.RefreshableCredentials.create_from_metadata(
metadata=refresh(),
refresh_using=refresh,
method='sts-assume-role-with-saml')
sess = botocore.session.get_session()
sess._credentials = session_credentials
sess.set_config_variable('region', region)
return boto3.Session(botocore_session=sess)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment