Skip to content

Instantly share code, notes, and snippets.

@yardbirdsax
Created January 17, 2020 16:34
Show Gist options
  • Save yardbirdsax/b9d181e7d262c8edb72a9d328a8cfe7f to your computer and use it in GitHub Desktop.
Save yardbirdsax/b9d181e7d262c8edb72a9d328a8cfe7f to your computer and use it in GitHub Desktop.
Set S3 Access Logging Across Multiple S3 Buckets
#!/usr/bin/env python3
import boto3
import botocore
import argparse
from pprint import pprint
import re
import logging
parser = argparse.ArgumentParser(description="Sets S3 Access Logging configuration on all buckets within an account matching a certain pattern.")
parser.add_argument('--target-bucket',type=str,help="The name of the bucket to use for the logging target.",required=True)
parser.add_argument('--pattern',type=str,help="The (regex) pattern used to match against bucket names.",default=".*")
parser.add_argument('--target-prefix',type=str,help="The prefix to use when generating the log files.",default="/")
parser.add_argument('--not-for-real',action="store_true")
parser.add_argument('--exclude-name',action='append',dest='exclude_names',type=str,help="Exclude buckets with this name. Can be specified multiple times.")
def get_bucket_region(Bucket:str, s3_client:boto3.client):
return s3_client.head_bucket(Bucket=Bucket)['ResponseMetadata']['HTTPHeaders']['x-amz-bucket-region']
logging.basicConfig(level=logging.INFO,format="%(levelname)-12s: %(message)s")
logger = logging.getLogger(__name__)
# This is so we don't print out some needless Boto messages
logging.getLogger("botocore").setLevel(logging.WARN)
try:
args = parser.parse_args()
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
buckets = s3_client.list_buckets()
target_bucket = boto3.resource('s3').Bucket(args.target_bucket)
target_bucket_region = get_bucket_region(args.target_bucket,s3_client)
for bucket in buckets['Buckets']:
if args.target_bucket == bucket['Name']:
logger.warning(f"Bucket {bucket['Name']} is also the target bucket, skipping.")
continue
if re.match(pattern=args.pattern,string=bucket['Name']):
bucket_logging = s3_resource.BucketLogging(bucket['Name'])
bucket_logging.load()
if bucket_logging.logging_enabled == None:
logger.info(f"Bucket {bucket['Name']} does not have logging enabled. Setting to target bucket.")
exclude_bucket = False
if args.exclude_names != None:
for exclude_name in args.exclude_names:
if exclude_name == bucket['Name']:
logger.info(f"Bucket {bucket['Name']} is included in the list of excluded buckets and will be skipped.")
exclude_bucket = True
if exclude_bucket:
continue
bucket_region = get_bucket_region(bucket['Name'],s3_client)
if bucket_region != target_bucket_region:
logger.warning(f"Bucket {bucket['Name']} ({bucket_region}) is not in the same region as target bucket {target_bucket.name} ({target_bucket_region}). Skipping.")
continue
bucket_logging_status = {
"LoggingEnabled": {
"TargetBucket": args.target_bucket,
"TargetPrefix": args.target_prefix
}
}
if args.not_for_real:
logger.info(f"Would have set bucket logging status to: {bucket_logging_status}.")
else:
bucket_logging.put(BucketLoggingStatus=bucket_logging_status)
except botocore.exceptions.ClientError as err:
logger.error(f"Boto3 client error occurred: {err}")
except:
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment