Skip to content

Instantly share code, notes, and snippets.

@alisade
Created January 20, 2021 19:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alisade/a087384d9a62ad0726b4596e61123aef to your computer and use it in GitHub Desktop.
Save alisade/a087384d9a62ad0726b4596e61123aef to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
import json
import uuid
import sys
from time import sleep
import backoff
from botocore import exceptions
if len(sys.argv) < 2 :
print("Usage: " + sys.argv[0] + " SOURCE_ROLE_NAME [AWS_PROFILE]")
exit(1)
source_role = sys.argv[1]
# create a copy of source role based of the name of the
# source role and the macaddress of the machine creating
# the role this will make sure a user will not generate too
# many roles as assumed roles ttl out
destination_role = str(uuid.uuid5(namespace=uuid.NAMESPACE_OID, name=source_role + str(uuid.getnode())))
try:
profile = sys.argv[2]
session = boto3.Session(profile_name=profile)
sts = session.client('sts')
iam = session.client('iam')
except:
sts = boto3.client('sts')
iam = boto3.client('iam')
try:
sts_identity = sts.get_caller_identity()['Arn']
except exceptions.ClientError as err:
if err.response['Error']['Code'] == 'ExpiredToken':
print('You need to re-authenticate with Okta and try again')
exit(1)
role = iam.get_role(RoleName=source_role)
assumed_role_policy = role['Role']['AssumeRolePolicyDocument']
# sleep for 30 seconds max
for remaining in range(30, 0, -1):
sys.stdout.write("\r")
sys.stdout.write("{:2d} Creating a new copy of the role...".format(remaining))
sys.stdout.flush()
try:
# duplicate role
iam.create_role(
RoleName=destination_role,
MaxSessionDuration=8 * 3600, # 8 hours max session
AssumeRolePolicyDocument=json.dumps(assumed_role_policy),
Tags=[
{
'Key': 'DUPLICATE-OF',
'Value': source_role
},
{
'Key': 'TEMPORARY_WILL_BE_DELETED',
'Value': 'True'
}
]
)
break
except iam.exceptions.EntityAlreadyExistsException:
# we will delete and recreate the same role again
# this makes sure we always get a fresh and untouched
# copy of the source role
response = iam.list_attached_role_policies(RoleName=destination_role)
for _policy in response.get('AttachedPolicies'):
_resp = iam.detach_role_policy(
RoleName=destination_role,
PolicyArn=_policy.get('PolicyArn')
)
response = iam.list_role_policies(RoleName=destination_role)
for _policy in response.get('PolicyNames'):
dest = iam.delete_role_policy(
RoleName=destination_role,
PolicyName=_policy
)
iam.delete_role(RoleName=destination_role)
sleep(1)
sys.stdout.write("\rRole copy created sucessfully. \n")
# Copy source role Inline Policies
response = iam.list_role_policies(RoleName=source_role)
for _policy in response.get('PolicyNames'):
_src = iam.get_role_policy(
RoleName=source_role,
PolicyName=_policy
)
dest = iam.put_role_policy(
RoleName=destination_role,
PolicyName=_policy,
PolicyDocument=json.dumps(_src.get('PolicyDocument'))
)
# Copy source role managed policies
response = iam.list_attached_role_policies(RoleName=source_role)
for _policy in response.get('AttachedPolicies'):
_resp = iam.attach_role_policy(
RoleName=destination_role,
PolicyArn=_policy.get('PolicyArn')
)
# find the currently assumed role
role_principal_to_add = ('/').join(sts_identity.split('/')[0:2]).replace('sts','iam').replace('assumed-','')
# find source policy statements
assumed_role_policy_statements = role['Role']['AssumeRolePolicyDocument']['Statement']
# append current role to trusted principals
assumed_role_policy_statements.append({'Effect': 'Allow', 'Principal': {'AWS': role_principal_to_add}, 'Action': 'sts:AssumeRole'})
# apply changes to the principals of the destination role
response = iam.update_assume_role_policy(
RoleName=destination_role,
PolicyDocument=json.dumps(assumed_role_policy)
)
role = iam.get_role(RoleName=destination_role)
@backoff.on_exception(backoff.expo, sts.exceptions.ClientError)
def assume_role(sts, role):
print('Waiting for the role to become available to assume...')
return sts.assume_role(RoleArn=role['Role']['Arn'],
RoleSessionName=str(uuid.uuid4()))
response = assume_role(sts, role)
def aws_envs():
print('RoleArn:', role['Role']['Arn'])
print('Run your docker command as below:')
print('=================================')
print('docker run -e AWS_ACCESS_KEY_ID="{}"'.format(response['Credentials']['AccessKeyId']), end=" ")
print('-e AWS_SECRET_ACCESS_KEY="{}"'.format(response['Credentials']['SecretAccessKey']), end=" ")
print('-e AWS_SESSION_TOKEN="{}"'.format(response['Credentials']['SessionToken']))
if __name__ == '__main__':
aws_envs()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment