Skip to content

Instantly share code, notes, and snippets.

@katebee
Created July 20, 2020 17:07
Show Gist options
  • Save katebee/3d44bd656d6a2dac651868ce2ef7c22f to your computer and use it in GitHub Desktop.
Save katebee/3d44bd656d6a2dac651868ce2ef7c22f to your computer and use it in GitHub Desktop.
import argparse
import logging
import sys
from typing import List, Dict
import boto3
from botocore.client import BaseClient
from botocore.exceptions import ClientError, ProfileNotFound
__description__ = "Run AWS API calls in all regions to enable Organization GuardDuty"
# Get the available regions, then for each region:
# 1. Enable a GuardDuty detector in the organisation administrator account
# This is required since the delegated administrator does not have permissions
# to configure GuardDuty detectors in the organisation administrator account.
# 2. Delegate GuardDuty administration to a member account
# AWS Security best practices recommend using a member account rather than the
# organisation admin for this to adhere to the principle of least of privilege.
# 3. Enable all member accounts and auto-enable any new members=
# Existing accounts may need to have a detector created and enabled before they
# start reporting centrally, which can be done by calling create_members.
def create_session(profile: str, region: str):
try:
return boto3.Session(profile_name=profile, region_name=region)
except ProfileNotFound as exception:
print(f"ProfileNotFound: {exception}", flush=True)
exit(-1)
except ClientError as exception:
print(f"ClientError: {exception}", flush=True)
exit(-1)
def with_retries(max_retries: int, handler: BaseClient, method_to_call: str, parameters: Dict) -> Dict:
data = None
try:
for retry in range(max_retries):
if handler.can_paginate(method_to_call):
paginator = handler.get_paginator(method_to_call)
page_iterator = paginator.paginate(**parameters)
for response in page_iterator:
if not data:
data = response
else:
for k in data:
if isinstance(data[k], list):
data[k].extend(response[k])
else:
function = getattr(handler, method_to_call)
data = function(**parameters)
except ClientError as exception:
print(f"ClientError: {exception}", flush=True)
return {
'exception': exception
}
return data
def get_regions(session) -> List[str]:
ec2_client = session.client('ec2')
regions = []
try:
response = ec2_client.describe_regions()
except ClientError as exception:
print(f"ClientError: {exception}", flush=True)
else:
for region in response['Regions']:
regions.append(region['RegionName'])
return regions
def get_member_details(session, admin_account_id: str):
client = session.client('organizations')
accounts = with_retries(1, client, 'list_accounts', {})
details = []
for account in accounts['Accounts']:
if account['Id'] != admin_account_id:
details.append(
{
'AccountId': account['Id'],
'Email': account['Email']
}
)
return details
def enable_members(profile: str, account_details: List[Dict], region: str):
session = create_session(profile, region)
client = session.client('guardduty')
all_detectors = client.list_detectors()['DetectorIds']
if len(all_detectors) == 1:
try:
client.update_organization_configuration(
DetectorId=all_detectors[0],
AutoEnable=True
)
unprocessed_accounts = client.create_members(
DetectorId=all_detectors[0],
AccountDetails=account_details
)
return unprocessed_accounts
except ClientError as exception:
print(f"ClientError: {exception}", flush=True)
# you must create a detector in each Region where you enable the service.
# You can have only one detector per account per Region
def create_detector(client: BaseClient):
detectors = client.list_detectors()['DetectorIds']
if not detectors:
print(f" >> Creating detector for region", flush=True)
client.create_detector(Enable=True)
def assign_admin(client: BaseClient, admin_account: str):
admins = client.list_organization_admin_accounts()['AdminAccounts']
if not admins:
print(f" >> Setting delegated administrator", flush=True)
client.enable_organization_admin_account(AdminAccountId=admin_account)
def delegate_administrator(profile: str, admin_account: str, region: str) -> Dict:
session = create_session(profile, region)
client = session.client('guardduty')
try:
create_detector(client)
assign_admin(client, admin_account)
except ClientError as exception:
print(f"ClientError: {exception}", flush=True)
return {
'exception': exception
}
def run(arguments):
parser = argparse.ArgumentParser()
parser.add_argument(
"--admin-account",
help="Account ID of the delegated administrator account for GuardDuty",
required=True,
type=str,
)
parser.add_argument(
"--profile",
help="AWS profile name of the organization manager account",
required=True,
type=str,
dest="profile",
)
parser.add_argument(
"--admin-profile",
help="AWS profile name of the delegated administrator account for GuardDuty",
required=True,
type=str,
)
args = parser.parse_args(arguments)
logging.getLogger("botocore").setLevel(logging.WARN)
# create client that we need to discover all regions and organization members
# the region is required but it does not matter which one is used
session = create_session(args.profile, 'eu-west-1')
members = get_member_details(session, args.admin_account)
regions = get_regions(session)
# TODO: add check that the organization manager account and the delegated administrator are different accounts
for region in regions:
print(f"⭐️ Configuring GuardDuty in {region}", flush=True)
delegate_administrator(args.profile, args.admin_account, region)
print(f" >> Enabling {len(members)} member accounts", flush=True)
enable_members(args.admin_profile, members, region)
if __name__ == "__main__":
run(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment