Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoaoCarabetta/403cf5f9b777c47db83ae889460df299 to your computer and use it in GitHub Desktop.
Save JoaoCarabetta/403cf5f9b777c47db83ae889460df299 to your computer and use it in GitHub Desktop.
Creates an Athena partitioned table for Waze data
DROP TABLE IF EXISTS main;
CREATE EXTERNAL TABLE main (
endTimeMillis BIGINT,
startTimeMillis BIGINT,
endTime STRING,
startTime STRING,
jams array<struct<
uuid: STRING,
pubMillis: BIGINT,
startNode: STRING,
endNode: STRING,
roadType: STRING,
country: STRING,
city: STRING,
level: INT,
speedKMH: FLOAT,
length: FLOAT,
turnType: STRING,
type: STRING,
speed: FLOAT,
segments: STRING,
line: array<struct<
x: DOUBLE,
y: DOUBLE>>
>>,
alerts array<struct<
uuid: STRING,
pubMillis: BIGINT,
roadType: INT,
`location`: struct<
x: DOUBLE,
y: DOUBLE>,
street: STRING,
city: STRING,
country: STRING,
magvar: INT,
reliability: INT,
reportRating:INT,
confidence: INT,
type: STRING,
subtype: STRING,
reportByMunicipalityUser: BOOLEAN,
nThumbsUp: INT,
jamUuid: STRING,
reportDescription: STRING
>>
)
PARTITIONED BY (year int,
month int,
day int,
hour int)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://yourregion';
"""
File strucure must be
year/month/day/hour/*.json
"""
#Import libraries
import boto3
from dateutil import rrule
from datetime import datetime, timedelta
#Connection for S3 and Athena
s3 = boto3.client('s3')
athena = boto3.client('athena', 'yourregion')
#Executing the athena query:
def run_query(query, database, s3_output):
query_response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + query_response['QueryExecutionId'])
return query_response
#Main function for get regions and run the query on the captured regions
def lambda_handler(event, context, date):
s3_buckcet = 'yourbucket'
s3_prefix = 'yourprefix'
s3_input = 's3://' + s3_buckcet + '/' + s3_prefix
s3_output = 's3://youtoutput'
database = 'yourdatabase'
table_name = 'yourtable'
#Get Year, Month, Day for partition (this will get tomorrow date's value)
athena_year = str(date.year)
athena_month = str(date.month)
athena_day = str(date.day)
athena_hour = str(date.hour)
query = str("ALTER TABLE "+ table_name +" ADD PARTITION (year="
+ athena_year + ",month="
+ athena_month + ",day="
+ athena_day + ",hour="
+ athena_hour
+ ") location '"+s3_input
+ "/" + athena_year + "/" + athena_month + "/"
+ athena_day + "/" + athena_hour + "';")
print(query)
return run_query(query, database, s3_output)
start = datetime.now() + timedelta(days=-200)
end = datetime.now() + timedelta(days=+1000)
for dt in rrule.rrule(rrule.HOURLY, dtstart=start, until=end):
try:
lambda_handler('event', 'context', dt)
except:
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment