Created
January 15, 2019 16:41
-
-
Save JoaoCarabetta/403cf5f9b777c47db83ae889460df299 to your computer and use it in GitHub Desktop.
Creates an Athena partitioned table for Waze data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS main; | |
CREATE EXTERNAL TABLE main ( | |
endTimeMillis BIGINT, | |
startTimeMillis BIGINT, | |
endTime STRING, | |
startTime STRING, | |
jams array<struct< | |
uuid: STRING, | |
pubMillis: BIGINT, | |
startNode: STRING, | |
endNode: STRING, | |
roadType: STRING, | |
country: STRING, | |
city: STRING, | |
level: INT, | |
speedKMH: FLOAT, | |
length: FLOAT, | |
turnType: STRING, | |
type: STRING, | |
speed: FLOAT, | |
segments: STRING, | |
line: array<struct< | |
x: DOUBLE, | |
y: DOUBLE>> | |
>>, | |
alerts array<struct< | |
uuid: STRING, | |
pubMillis: BIGINT, | |
roadType: INT, | |
`location`: struct< | |
x: DOUBLE, | |
y: DOUBLE>, | |
street: STRING, | |
city: STRING, | |
country: STRING, | |
magvar: INT, | |
reliability: INT, | |
reportRating:INT, | |
confidence: INT, | |
type: STRING, | |
subtype: STRING, | |
reportByMunicipalityUser: BOOLEAN, | |
nThumbsUp: INT, | |
jamUuid: STRING, | |
reportDescription: STRING | |
>> | |
) | |
PARTITIONED BY (year int, | |
month int, | |
day int, | |
hour int) | |
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' | |
LOCATION 's3://yourregion'; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
File strucure must be | |
year/month/day/hour/*.json | |
""" | |
#Import libraries | |
import boto3 | |
from dateutil import rrule | |
from datetime import datetime, timedelta | |
#Connection for S3 and Athena | |
s3 = boto3.client('s3') | |
athena = boto3.client('athena', 'yourregion') | |
#Executing the athena query: | |
def run_query(query, database, s3_output): | |
query_response = athena.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': database | |
}, | |
ResultConfiguration={ | |
'OutputLocation': s3_output, | |
} | |
) | |
print('Execution ID: ' + query_response['QueryExecutionId']) | |
return query_response | |
#Main function for get regions and run the query on the captured regions | |
def lambda_handler(event, context, date): | |
s3_buckcet = 'yourbucket' | |
s3_prefix = 'yourprefix' | |
s3_input = 's3://' + s3_buckcet + '/' + s3_prefix | |
s3_output = 's3://youtoutput' | |
database = 'yourdatabase' | |
table_name = 'yourtable' | |
#Get Year, Month, Day for partition (this will get tomorrow date's value) | |
athena_year = str(date.year) | |
athena_month = str(date.month) | |
athena_day = str(date.day) | |
athena_hour = str(date.hour) | |
query = str("ALTER TABLE "+ table_name +" ADD PARTITION (year=" | |
+ athena_year + ",month=" | |
+ athena_month + ",day=" | |
+ athena_day + ",hour=" | |
+ athena_hour | |
+ ") location '"+s3_input | |
+ "/" + athena_year + "/" + athena_month + "/" | |
+ athena_day + "/" + athena_hour + "';") | |
print(query) | |
return run_query(query, database, s3_output) | |
start = datetime.now() + timedelta(days=-200) | |
end = datetime.now() + timedelta(days=+1000) | |
for dt in rrule.rrule(rrule.HOURLY, dtstart=start, until=end): | |
try: | |
lambda_handler('event', 'context', dt) | |
except: | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment