Created
July 20, 2020 17:07
-
-
Save katebee/3d44bd656d6a2dac651868ce2ef7c22f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import sys | |
from typing import List, Dict | |
import boto3 | |
from botocore.client import BaseClient | |
from botocore.exceptions import ClientError, ProfileNotFound | |
__description__ = "Run AWS API calls in all regions to enable Organization GuardDuty" | |
# Get the available regions, then for each region: | |
# 1. Enable a GuardDuty detector in the organisation administrator account | |
# This is required since the delegated administrator does not have permissions | |
# to configure GuardDuty detectors in the organisation administrator account. | |
# 2. Delegate GuardDuty administration to a member account | |
# AWS Security best practices recommend using a member account rather than the | |
# organisation admin for this to adhere to the principle of least of privilege. | |
# 3. Enable all member accounts and auto-enable any new members= | |
# Existing accounts may need to have a detector created and enabled before they | |
# start reporting centrally, which can be done by calling create_members. | |
def create_session(profile: str, region: str): | |
try: | |
return boto3.Session(profile_name=profile, region_name=region) | |
except ProfileNotFound as exception: | |
print(f"ProfileNotFound: {exception}", flush=True) | |
exit(-1) | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
exit(-1) | |
def with_retries(max_retries: int, handler: BaseClient, method_to_call: str, parameters: Dict) -> Dict: | |
data = None | |
try: | |
for retry in range(max_retries): | |
if handler.can_paginate(method_to_call): | |
paginator = handler.get_paginator(method_to_call) | |
page_iterator = paginator.paginate(**parameters) | |
for response in page_iterator: | |
if not data: | |
data = response | |
else: | |
for k in data: | |
if isinstance(data[k], list): | |
data[k].extend(response[k]) | |
else: | |
function = getattr(handler, method_to_call) | |
data = function(**parameters) | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
return { | |
'exception': exception | |
} | |
return data | |
def get_regions(session) -> List[str]: | |
ec2_client = session.client('ec2') | |
regions = [] | |
try: | |
response = ec2_client.describe_regions() | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
else: | |
for region in response['Regions']: | |
regions.append(region['RegionName']) | |
return regions | |
def get_member_details(session, admin_account_id: str): | |
client = session.client('organizations') | |
accounts = with_retries(1, client, 'list_accounts', {}) | |
details = [] | |
for account in accounts['Accounts']: | |
if account['Id'] != admin_account_id: | |
details.append( | |
{ | |
'AccountId': account['Id'], | |
'Email': account['Email'] | |
} | |
) | |
return details | |
def enable_members(profile: str, account_details: List[Dict], region: str): | |
session = create_session(profile, region) | |
client = session.client('guardduty') | |
all_detectors = client.list_detectors()['DetectorIds'] | |
if len(all_detectors) == 1: | |
try: | |
client.update_organization_configuration( | |
DetectorId=all_detectors[0], | |
AutoEnable=True | |
) | |
unprocessed_accounts = client.create_members( | |
DetectorId=all_detectors[0], | |
AccountDetails=account_details | |
) | |
return unprocessed_accounts | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
# you must create a detector in each Region where you enable the service. | |
# You can have only one detector per account per Region | |
def create_detector(client: BaseClient): | |
detectors = client.list_detectors()['DetectorIds'] | |
if not detectors: | |
print(f" >> Creating detector for region", flush=True) | |
client.create_detector(Enable=True) | |
def assign_admin(client: BaseClient, admin_account: str): | |
admins = client.list_organization_admin_accounts()['AdminAccounts'] | |
if not admins: | |
print(f" >> Setting delegated administrator", flush=True) | |
client.enable_organization_admin_account(AdminAccountId=admin_account) | |
def delegate_administrator(profile: str, admin_account: str, region: str) -> Dict: | |
session = create_session(profile, region) | |
client = session.client('guardduty') | |
try: | |
create_detector(client) | |
assign_admin(client, admin_account) | |
except ClientError as exception: | |
print(f"ClientError: {exception}", flush=True) | |
return { | |
'exception': exception | |
} | |
def run(arguments): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--admin-account", | |
help="Account ID of the delegated administrator account for GuardDuty", | |
required=True, | |
type=str, | |
) | |
parser.add_argument( | |
"--profile", | |
help="AWS profile name of the organization manager account", | |
required=True, | |
type=str, | |
dest="profile", | |
) | |
parser.add_argument( | |
"--admin-profile", | |
help="AWS profile name of the delegated administrator account for GuardDuty", | |
required=True, | |
type=str, | |
) | |
args = parser.parse_args(arguments) | |
logging.getLogger("botocore").setLevel(logging.WARN) | |
# create client that we need to discover all regions and organization members | |
# the region is required but it does not matter which one is used | |
session = create_session(args.profile, 'eu-west-1') | |
members = get_member_details(session, args.admin_account) | |
regions = get_regions(session) | |
# TODO: add check that the organization manager account and the delegated administrator are different accounts | |
for region in regions: | |
print(f"⭐️ Configuring GuardDuty in {region}", flush=True) | |
delegate_administrator(args.profile, args.admin_account, region) | |
print(f" >> Enabling {len(members)} member accounts", flush=True) | |
enable_members(args.admin_profile, members, region) | |
if __name__ == "__main__": | |
run(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment