Creates time based Glue partitions given time range.
Keep in mind that you don't need data to add partitions. So, you can create partitions for a whole year and add the data to S3 later.
import json | |
import boto3 | |
from dateutil import rrule | |
from datetime import datetime, timedelta | |
glue = boto3.client('glue', '--') # Update with your location | |
s3 = boto3.client('s3') | |
def get_current_schema(table_name, database_name): | |
response = glue.get_table( | |
DatabaseName=database_name, | |
Name=table_name) | |
table_data = {} | |
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat'] | |
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat'] | |
table_data['table_location'] = response['Table']['StorageDescriptor']['Location'] | |
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo'] | |
table_data['partition_keys'] = response['Table']['PartitionKeys'] | |
return table_data | |
def create_partitions(data, database_name, table_name): | |
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)] | |
for i, data in enumerate(break_list_in_chunks(data, 100)): | |
print(i) | |
reate_partition_response = glue.batch_create_partition( | |
DatabaseName=database_name, | |
TableName=table_name, | |
PartitionInputList=data | |
) | |
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input, | |
table_data): | |
part_location = "{}/year={}/month={}/day={}/hour={}".format(s3_input, year, month, day, hour) | |
input_dict = { | |
'Values': [ | |
year, month, day, hour | |
], | |
'StorageDescriptor': { | |
'Location': part_location, | |
'InputFormat': table_data['input_format'], | |
'OutputFormat': table_data['output_format'], | |
'SerdeInfo': table_data['serde_info'] | |
} | |
} | |
return input_dict | |
def generate_partition_input_list(start, end, s3_input, polygons, table_data): | |
input_list = [] | |
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end): | |
year = str(date.year) | |
month = str(date.month) | |
day = str(date.day) | |
hour = str(date.hour) | |
input_list.append(generate_partition_input(year, month, day, hour, s3_input, table_data)) | |
return input_list | |
def lambda_handler(event, context): | |
# Glue table location | |
database_name = 'test' # update | |
table_name = 'raw_data_test' # update | |
# S3 info location | |
s3_bucket_glue = '--' # update | |
s3_prefix_glue = '--' # update | |
s3_input_glue = 's3://' + s3_bucket_glue + '/' + s3_prefix_glue | |
# Desired time range | |
start_time = datetime.now() + timedelta(days=0) | |
end_time = datetime.now() + timedelta(days=2) | |
# Get Glue table metadata | |
table_data = get_current_schema(table_name, database_name) | |
# Generate partition list of dicts | |
data = generate_partition_input_list(start_time, end_time, s3_input_glue, table_data) | |
# Batch insert partitions | |
create_partitions(data, database_name, table_name) |
Thanks for the reply.
I have smaller data as of now. It's small project for educational purpose. But trying to apply all best practices I can.
Your code is completely clear...I was just unclear about you iterate "polygons" in "generate_partition_input_list()" method, what is value part in polygon['polygon_slug']....!
I will probably take route for splitting the .csv into three (multiple) and create Glue partitions using Lambda so that I can refuse to to run expensive Glue Crawler.
Thanks !