Skip to content

Instantly share code, notes, and snippets.

@dallasmarlow
Created November 5, 2020 20:01
Show Gist options
  • Save dallasmarlow/eac82c54cec36479f32748fd5d41d2e7 to your computer and use it in GitHub Desktop.
Save dallasmarlow/eac82c54cec36479f32748fd5d41d2e7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
import os.path
import re
import sys
aws_region = 'us-east-2'
region_pattern = '^[a-z]+-[a-z]+-[0-9]{1}'
s3_bucket = 'periscope-cloudtrail-audit'
s3_path = 'AWSLogs/<account id>/CloudTrail/'
s3_client = boto3.client('s3', region_name=aws_region)
athena_client = boto3.client('athena', region_name=aws_region)
athena_database = 'cloudtrail'
athena_table = 'cloudtrail_logs'
athena_result_location = 's3://<account id>-athena-results/cloudtrail'
drop_table_statement = 'drop table if exists {}'
create_table_statement = """
CREATE EXTERNAL TABLE `{}`(
`eventversion` string COMMENT 'from deserializer',
`useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer',
`eventtime` string COMMENT 'from deserializer',
`eventsource` string COMMENT 'from deserializer',
`eventname` string COMMENT 'from deserializer',
`awsregion` string COMMENT 'from deserializer',
`sourceipaddress` string COMMENT 'from deserializer',
`useragent` string COMMENT 'from deserializer',
`errorcode` string COMMENT 'from deserializer',
`errormessage` string COMMENT 'from deserializer',
`requestparameters` string COMMENT 'from deserializer',
`responseelements` string COMMENT 'from deserializer',
`additionaleventdata` string COMMENT 'from deserializer',
`requestid` string COMMENT 'from deserializer',
`eventid` string COMMENT 'from deserializer',
`resources` array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer',
`eventtype` string COMMENT 'from deserializer',
`apiversion` string COMMENT 'from deserializer',
`readonly` string COMMENT 'from deserializer',
`recipientaccountid` string COMMENT 'from deserializer',
`serviceeventdetails` string COMMENT 'from deserializer',
`sharedeventid` string COMMENT 'from deserializer',
`vpcendpointid` string COMMENT 'from deserializer')
PARTITIONED BY (
`region` string,
`year` string,
`month` string,
`day` string)
ROW FORMAT SERDE
'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT
'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://{}'
"""
partition_alter_statement = 'alter table {} add partition (region=\'{}\', year=\'{}\', month=\'{}\', day=\'{}\') location \'s3://{}\''
# drop table
query_id = athena_client.start_query_execution(
QueryString=drop_table_statement.format(athena_table),
QueryExecutionContext={'Database': athena_database},
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId']
query_state = None
while (query_state is None or query_state in ('RUNNING', 'QUEUED')):
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
sys.exit('unable to drop table: {}'.format(athena_table))
# create table
query_id = athena_client.start_query_execution(
QueryString=create_table_statement.format(athena_table, os.path.join(s3_bucket, s3_path)),
QueryExecutionContext={'Database': athena_database},
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId']
query_state = None
while (query_state is None or query_state in ('RUNNING', 'QUEUED')):
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
sys.exit('unable to create table: {}'.format(athena_table))
# add partitions
partition_queries = {}
for region_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=s3_path)['CommonPrefixes']:
region_path = region_obj['Prefix']
region = os.path.basename(region_path.strip('/'))
if not re.match(region_pattern, region):
continue
for year_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=region_path)['CommonPrefixes']:
year_path = year_obj['Prefix']
year = os.path.basename(year_path.strip('/'))
for month_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=year_path)['CommonPrefixes']:
month_path = month_obj['Prefix']
month = os.path.basename(month_path.strip('/'))
for day_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=month_path)['CommonPrefixes']:
day_path = day_obj['Prefix']
day = os.path.basename(day_path.strip('/'))
print('adding table partition: {}'.format(day_path))
query = partition_alter_statement.format(
athena_table,
region,
year,
month,
day,
os.path.join(s3_bucket, day_path))
partition_queries[athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': athena_database},
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId']] = day_path
for query_id, partition_path in partition_queries.items():
query_state = None
while (query_state is None or query_state in ('RUNNING', 'QUEUED')):
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
print('ERROR - failed to add partition with path: {}'.format(partition_path))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment