Skip to content

Instantly share code, notes, and snippets.

@SQLadmin
Last active March 23, 2023 21:08
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save SQLadmin/d55938f7ff8202508618c3f8dfe5883e to your computer and use it in GitHub Desktop.
Save SQLadmin/d55938f7ff8202508618c3f8dfe5883e to your computer and use it in GitHub Desktop.
AWS Athena automatically add partitions for given two dates for cloudtrail logs via lambda / Python
# Lambda function / Python to create athena partitions for Cloudtrail log between any given days.
# If you run this in AWS Lambda then it can't able to ceate all the partitions.
# Because lambda can run any functions up to 5mins. But create partition query will take avg 6 secs.
# I did a benchmark on lambda, it created upto 3 months of partitions on 16 regions.
'''
-----------------------------------------------------------------
AWS Athena Create Partitions Automatically For Given Any TWO DAYS
-----------------------------------------------------------------
Version 1.0
Author: SqlAdmin
Twitter: https://twitter.com/SqlAdmin
License: Free for educational purpose.
NOTE:
-----
1) If you run this in AWS Lambda then it can able to create partitions for 4months only.
2) This will not return the Athena query is successful or not. But this
will return the Query Execution ID.
HOW THIS WORKS:
---------------
1) It'll check the list of regions that cloudwatch logs captured from the
S3. Becuase few peoples will use only particular region. So they won't
get any logs on other regions.
2) Then it'll start executing the create partition queries against all
the regions with given date.
3) Once its trigged a alter table query, then it'll wait for 2 sec to run the next query.
Because in Athena we can run upto 20 concurrent quries only. So I made 2sec sleep.
Example Cloudtrail Path:
-----------------------
s3://bucket/AWSLogs/Account_ID/Cloudtrail/regions/year/month/day/log_files
PARAMETERS NEEDS TO CHANGE:
---------------------------
1) s3_bucket - Bucket name where your cloudtrail logs stored.
2) s3_prefix - Path for your cloudtrail logs (give the prefix before the regions.
for eg: s3://bucket/AWSLogs/AccountID/Cloudtrail/regions/year/month/day/log_files
So you need to use path: AWSLogs/AccountID/Cloudtrail/ )
3) s3_ouput - Path for where your Athena query results need to be saved.
4) database - Name of the DB where your cloudwatch logs table located.
5) table_name - Nanme of the table where your cloudwatch logs table located.
6) start_date - Start date for creating partition.
7) end_date - Last date for creating partition.
DEBUGGING:
----------
1) comment the line 118 [run_query(query, database, s3_ouput)]
2) remove comment from line 116 and 117 [print(get-regions), print(query)]
---------------------------------------------------------------------------------'''
#Import libraries
import boto3
import datetime
import time
from datetime import timedelta, date
#Connection for S3 and Athena
s3 = boto3.client('s3')
athena = boto3.client('athena')
#Parameters for S3 log location and Athena table
#Fill this carefully (Read the commented section on top to help)
s3_buckcet = 'sqladmin-cloudtrail'
s3_prefix = 'AWSLogs/XXXXXXXXXXXX/CloudTrail/'
s3_input = 's3://' + s3_buckcet + '/' + s3_prefix
s3_ouput = 's3://aws-athena-query-results-XXXXXXXXXXXX-us-east-1'
database = 'aws_log_database'
table_name = 'cl_log_tbl'
start_date = date(2017, 1, 1)
end_date = date(2017, 4, 30)
#Function for get the day, month, year between 2 days.
def daterange(start_date, end_date):
for n in range(int ((end_date - start_date).days)):
yield start_date + timedelta(n)
#Executing the athena query:
def run_query(query, database, s3_output):
query_response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + query_response['QueryExecutionId'])
return query_response
#Main function for get regions and run the query on the captured regions
def lambda_handler(event, context):
result = s3.list_objects(Bucket=s3_buckcet,Prefix=s3_prefix, Delimiter='/')
for regions in result.get('CommonPrefixes'):
get_region=(regions.get('Prefix','').replace(s3_prefix,'').replace('/',''))
for single_date in daterange(start_date, end_date):
partition_day = str(single_date.day).rjust(2, '0')
partition_month = str(single_date.month).rjust(2, '0')
partition_year = str(single_date.year).rjust(2, '0')
print(partition_day, partition_month, partition_year)
query = str("ALTER TABLE "+ table_name +" ADD PARTITION (region='"
+ get_region + "',year="
+ partition_year + ",month="
+ partition_month + ",day="
+ partition_day
+ ") location '"+s3_input
+ get_region
+ "/" + partition_year + "/" + partition_month + "/"
+ partition_day + "';")
#print(get_region)
#print(query)
run_query(query, database, s3_ouput)
time.sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment