Created
November 5, 2020 20:01
-
-
Save dallasmarlow/eac82c54cec36479f32748fd5d41d2e7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import boto3 | |
import os.path | |
import re | |
import sys | |
aws_region = 'us-east-2' | |
region_pattern = '^[a-z]+-[a-z]+-[0-9]{1}' | |
s3_bucket = 'periscope-cloudtrail-audit' | |
s3_path = 'AWSLogs/<account id>/CloudTrail/' | |
s3_client = boto3.client('s3', region_name=aws_region) | |
athena_client = boto3.client('athena', region_name=aws_region) | |
athena_database = 'cloudtrail' | |
athena_table = 'cloudtrail_logs' | |
athena_result_location = 's3://<account id>-athena-results/cloudtrail' | |
drop_table_statement = 'drop table if exists {}' | |
create_table_statement = """ | |
CREATE EXTERNAL TABLE `{}`( | |
`eventversion` string COMMENT 'from deserializer', | |
`useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer', | |
`eventtime` string COMMENT 'from deserializer', | |
`eventsource` string COMMENT 'from deserializer', | |
`eventname` string COMMENT 'from deserializer', | |
`awsregion` string COMMENT 'from deserializer', | |
`sourceipaddress` string COMMENT 'from deserializer', | |
`useragent` string COMMENT 'from deserializer', | |
`errorcode` string COMMENT 'from deserializer', | |
`errormessage` string COMMENT 'from deserializer', | |
`requestparameters` string COMMENT 'from deserializer', | |
`responseelements` string COMMENT 'from deserializer', | |
`additionaleventdata` string COMMENT 'from deserializer', | |
`requestid` string COMMENT 'from deserializer', | |
`eventid` string COMMENT 'from deserializer', | |
`resources` array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer', | |
`eventtype` string COMMENT 'from deserializer', | |
`apiversion` string COMMENT 'from deserializer', | |
`readonly` string COMMENT 'from deserializer', | |
`recipientaccountid` string COMMENT 'from deserializer', | |
`serviceeventdetails` string COMMENT 'from deserializer', | |
`sharedeventid` string COMMENT 'from deserializer', | |
`vpcendpointid` string COMMENT 'from deserializer') | |
PARTITIONED BY ( | |
`region` string, | |
`year` string, | |
`month` string, | |
`day` string) | |
ROW FORMAT SERDE | |
'com.amazon.emr.hive.serde.CloudTrailSerde' | |
STORED AS INPUTFORMAT | |
'com.amazon.emr.cloudtrail.CloudTrailInputFormat' | |
OUTPUTFORMAT | |
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | |
LOCATION | |
's3://{}' | |
""" | |
partition_alter_statement = 'alter table {} add partition (region=\'{}\', year=\'{}\', month=\'{}\', day=\'{}\') location \'s3://{}\'' | |
# drop table | |
query_id = athena_client.start_query_execution( | |
QueryString=drop_table_statement.format(athena_table), | |
QueryExecutionContext={'Database': athena_database}, | |
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId'] | |
query_state = None | |
while (query_state is None or query_state in ('RUNNING', 'QUEUED')): | |
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
if query_state == 'FAILED': | |
sys.exit('unable to drop table: {}'.format(athena_table)) | |
# create table | |
query_id = athena_client.start_query_execution( | |
QueryString=create_table_statement.format(athena_table, os.path.join(s3_bucket, s3_path)), | |
QueryExecutionContext={'Database': athena_database}, | |
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId'] | |
query_state = None | |
while (query_state is None or query_state in ('RUNNING', 'QUEUED')): | |
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
if query_state == 'FAILED': | |
sys.exit('unable to create table: {}'.format(athena_table)) | |
# add partitions | |
partition_queries = {} | |
for region_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=s3_path)['CommonPrefixes']: | |
region_path = region_obj['Prefix'] | |
region = os.path.basename(region_path.strip('/')) | |
if not re.match(region_pattern, region): | |
continue | |
for year_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=region_path)['CommonPrefixes']: | |
year_path = year_obj['Prefix'] | |
year = os.path.basename(year_path.strip('/')) | |
for month_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=year_path)['CommonPrefixes']: | |
month_path = month_obj['Prefix'] | |
month = os.path.basename(month_path.strip('/')) | |
for day_obj in s3_client.list_objects(Bucket=s3_bucket, Delimiter='/', Prefix=month_path)['CommonPrefixes']: | |
day_path = day_obj['Prefix'] | |
day = os.path.basename(day_path.strip('/')) | |
print('adding table partition: {}'.format(day_path)) | |
query = partition_alter_statement.format( | |
athena_table, | |
region, | |
year, | |
month, | |
day, | |
os.path.join(s3_bucket, day_path)) | |
partition_queries[athena_client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={'Database': athena_database}, | |
ResultConfiguration={'OutputLocation': athena_result_location})['QueryExecutionId']] = day_path | |
for query_id, partition_path in partition_queries.items(): | |
query_state = None | |
while (query_state is None or query_state in ('RUNNING', 'QUEUED')): | |
query_state = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
if query_state == 'FAILED': | |
print('ERROR - failed to add partition with path: {}'.format(partition_path)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment