Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Rotate AWS credential for on-prem servers which has tag on-prem:bamboo-agent
import os
import json
import logging
import time
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sts_client = boto3.client("sts")
ssm_client = boto3.client("ssm")
ROLE_ARN = os.environ['ROLE_ARN']
DOCUMENT_NAME = 'AWS-RunShellScript'
LOG_GROUP = 'ssm-rotate-on-prem-bamboo-credentials'
SESSION_NAME = 'rotate-on-prem-bamboo-credentials'
COMMAND_TARGET=[
{
'Key': 'tag:on-prem',
'Values': ['bamboo-agent']
},
]
def assume_role_for_credential(role_arn: str, session_name: str) -> dict:
"""Assume IAM role then return the credential"""
credential = {}
role = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name
)
credential['AccessKeyId']=role['Credentials']['AccessKeyId']
credential['SecretAccessKey']=role['Credentials']['SecretAccessKey']
credential['SessionToken']=role['Credentials']['SessionToken']
return credential
def generate_command_params() -> dict:
"""Form the params for run command"""
params = {
'workingDirectory': [''],
'executionTimeout': ['300'],
'commands': []
}
credential = assume_role_for_credential(ROLE_ARN, SESSION_NAME)
params['commands'].append("echo [default] > ~/.aws/credentials")
params['commands'].append(f"echo aws_access_key_id={credential['AccessKeyId']} >> ~/.aws/credentials")
params['commands'].append(f"echo aws_secret_access_key={credential['SecretAccessKey']} >> ~/.aws/credentials")
params['commands'].append(f"echo aws_session_token={credential['SessionToken']} >> ~/.aws/credentials")
print(params)
return params
def is_response_code_200(response: dict) -> bool:
"""Check if response code is 200"""
try:
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.info('Response code is 200')
return True
else:
logger.error(f"Response code is {response['ResponseMetadata']['HTTPStatusCode']}")
return False
except Exception as e:
logger.error(f'Response error: {e}')
return False
def send_command(params: dict) -> str:
"""Send SSM run command"""
try:
response = ssm_client.send_command(
Targets=COMMAND_TARGET,
DocumentName=DOCUMENT_NAME,
Parameters=params,
CloudWatchOutputConfig={
'CloudWatchLogGroupName': LOG_GROUP,
'CloudWatchOutputEnabled': True
},
TimeoutSeconds=120
)
if is_response_code_200(response):
logger.info(f'Command is sent successfully: {response}')
return response['Command']['CommandId']
else:
logger.error(f'Command is sent, but response is not 200: {response}')
return None
except Exception as e:
logger.error(f'Command could not be sent: {e}')
return None
def is_command_invocation_success(command_id: str) -> bool:
"""Check if SSM run command invocation status is success"""
timewait = 1
while True:
response = ssm_client.list_command_invocations(
CommandId=command_id,
Details=False
)
if is_response_code_200(response):
if response['CommandInvocations']:
invocation_status = response['CommandInvocations'][0]['Status']
if invocation_status != 'Pending':
if invocation_status == 'InProgress' or invocation_status == 'Success':
logging.info(f'Command invocation status: {invocation_status}')
if invocation_status == 'Success':
return True
else:
logging.error(f'Command invocation failed: {response}')
return False
time.sleep(timewait)
timewait += timewait
def lambda_handler(event, context):
"""Lambda Handler"""
logging.info(json.dumps(event))
command_params = generate_command_params()
command_id = send_command(command_params)
is_command_invocation_success(command_id)
if __name__ == '__main__':
lambda_handler({}, {})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment