Skip to content

Instantly share code, notes, and snippets.

@jewelsjacobs
Created July 24, 2023 21:21
Show Gist options
  • Save jewelsjacobs/8dc644ea35f9cd7b3891fc47924506a1 to your computer and use it in GitHub Desktop.
Save jewelsjacobs/8dc644ea35f9cd7b3891fc47924506a1 to your computer and use it in GitHub Desktop.
LoadConsoleData Python Lambda App
import os
import boto3
import coto
from botocore.exceptions import ClientError
import logging
from pythonjsonlogger import jsonlogger
from datetime import datetime
import json
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
return json.JSONEncoder.default(self, o)
session = coto.Session(
boto3_session=boto3.Session()
)
def setup_logging(log_level):
""" structured json logger handler
Include pythonjsonlogger package or create layer
https://github.com/madzak/python-json-logger
"""
logger = logging.getLogger()
# Testing showed lambda sets up one default handler. If there are more,
# something has changed and we want to fail so an operator can investigate.
assert len(logger.handlers) == 1
logger.setLevel(log_level)
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(name)s %(message)s'
)
json_handler.setFormatter(formatter)
logger.addHandler(json_handler)
logger.removeHandler(logger.handlers[0])
setup_logging(logging.INFO)
logger = logging.getLogger()
def create_session(account_number, role_name):
logger.info("Assuming role {} for within account {}.".format(role_name, account_number))
try:
# Beginning the assume role process for account
sts_client = boto3.client('sts')
# Get the current partition
partition = sts_client.get_caller_identity()['Arn'].split(":")[1]
# Assume the role within the account
response = sts_client.assume_role(
RoleArn='arn:{}:iam::{}:role/{}'.format(partition, account_number, role_name),
RoleSessionName=role_name
)
# Storing account credentials into coto session https://github.com/sentialabs/coto
logger.info("Granted as role {} within account {}, storing temporary credentials.".format(role_name,
account_number))
session = boto3.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken']
)
logger.info("Credentials for session in the account {} established.".format(account_number))
return session
except ClientError as error:
logger.error(error)
return False
except Exception as error:
logger.error(error)
return False
def get_contact_info():
try:
client = session.client('billing')
list_alternate_contacts = client.list_alternate_contacts()
alt_contacts=[]
for contact in list_alternate_contacts:
alt_contacts.append({ 'M' : {
"name": {
"S": contact["name"]
},
"contactType": {
"S": contact["contactType"]
},
"contactId": {
"S": str(contact["contactId"])
},
"email": {
"S": contact["email"]
},
"phoneNumber": {
"S": contact["phoneNumber"]
},
"title": {
"S": contact["title"]
}
}})
alt_contact_items={ 'M': {
'AlternateContacts': {
'L': alt_contacts
}
}
}
logger.info(alt_contact_items)
return alt_contact_items
except ClientError as error:
logger.error(error)
return False
def get_member_account_ids():
try:
organizations = boto3.client('organizations')
response = organizations.list_accounts()
results = response["Accounts"]
while "NextToken" in response:
response = organizations.list_accounts(NextToken=response["NextToken"])
results.extend(response["Accounts"])
accounts = []
for result in results:
accounts.append(result['Id'])
logger.info(accounts)
return accounts
except ClientError as error:
logger.error(error)
return False
def get_control_info(account_id, contacts_db, dynamo_db_session):
try:
client = dynamo_db_session.client('dynamodb')
response = client.query(
TableName=contacts_db,
IndexName='Account-Index',
FilterExpression='evidencetype = :evidencetype',
KeyConditionExpression='accountid = :accountid and begins_with (accountidcontrol, :accountid)',
ProjectionExpression='#id, controlaccountid',
ExpressionAttributeNames={"#id": "ID"},
ExpressionAttributeValues={
':accountid': {'S': str(account_id)},
':evidencetype': {'S': "alternatecontacts"}
}
)
return response['Items']
except ClientError as error:
logger.error(error)
return False
def update(contact_info, control, contacts_db, dynamo_db_session):
try:
client = dynamo_db_session.client('dynamodb')
response = client.update_item(
TableName=contacts_db,
Key={
'ID': control["ID"],
'controlaccountid': control["controlaccountid"]
},
UpdateExpression='set evidence = :evidence, configrule = :configrule, arn = :arn, #ST = :compliancestatus',
ExpressionAttributeNames={'#ST': 'status'},
ExpressionAttributeValues={
':evidence': contact_info,
':configrule': {'S': "NA"},
':arn': {'S': "NA"},
':compliancestatus': {'S': "COMPLIANT"}
},
ReturnValues="UPDATED_NEW"
)
return response
except ClientError as error:
logger.error(error)
return False
def lambda_handler(event, context):
# logger.info("boto3 version: {}".format(boto3.__version__))
# logger.info('## EVENT')
# logger.info(event)
# logger.info('## ENVIRONMENT VARIABLES')
# logger.info("ContactsDB: {}".format(os.environ['ContactsDB']))
contacts_db = os.environ['ContactsDB']
# logger.info("SecHubRole: {}".format(os.environ['SecHubRole']))
dynamo_db_role = os.environ['DynamoDBRole']
# logger.info("DynamoDBRole: {}".format(os.environ['DynamoDBRole']))
security_account_id = os.environ['SecurityAccountID']
try:
dynamo_db_session = create_session(security_account_id, dynamo_db_role)
account_ids = get_member_account_ids()
for account_id in account_ids:
controls = get_control_info(account_id, contacts_db, dynamo_db_session)
for control in controls:
contact_info = get_contact_info()
logger.info(contact_info)
item = update(contact_info, control, contacts_db, dynamo_db_session)
logger.info(item)
return True
except ClientError as error:
logger.error(error)
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment