Skip to content

Instantly share code, notes, and snippets.

@JoaoCarabetta
Last active February 3, 2022 16:05
Show Gist options
  • Save JoaoCarabetta/4634ab21791247a1308c707552deaaca to your computer and use it in GitHub Desktop.
Save JoaoCarabetta/4634ab21791247a1308c707552deaaca to your computer and use it in GitHub Desktop.
Add Time Based Glue Partitions with Lambda AWS

Creates time based Glue partitions given time range.

Keep in mind that you don't need data to add partitions. So, you can create partitions for a whole year and add the data to S3 later.

import json
import boto3
from dateutil import rrule
from datetime import datetime, timedelta
glue = boto3.client('glue', '--') # Update with your location
s3 = boto3.client('s3')
def get_current_schema(table_name, database_name):
response = glue.get_table(
DatabaseName=database_name,
Name=table_name)
table_data = {}
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat']
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat']
table_data['table_location'] = response['Table']['StorageDescriptor']['Location']
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo']
table_data['partition_keys'] = response['Table']['PartitionKeys']
return table_data
def create_partitions(data, database_name, table_name):
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)]
for i, data in enumerate(break_list_in_chunks(data, 100)):
print(i)
reate_partition_response = glue.batch_create_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionInputList=data
)
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input,
table_data):
part_location = "{}/year={}/month={}/day={}/hour={}".format(s3_input, year, month, day, hour)
input_dict = {
'Values': [
year, month, day, hour
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': table_data['input_format'],
'OutputFormat': table_data['output_format'],
'SerdeInfo': table_data['serde_info']
}
}
return input_dict
def generate_partition_input_list(start, end, s3_input, polygons, table_data):
input_list = []
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end):
year = str(date.year)
month = str(date.month)
day = str(date.day)
hour = str(date.hour)
input_list.append(generate_partition_input(year, month, day, hour, s3_input, table_data))
return input_list
def lambda_handler(event, context):
# Glue table location
database_name = 'test' # update
table_name = 'raw_data_test' # update
# S3 info location
s3_bucket_glue = '--' # update
s3_prefix_glue = '--' # update
s3_input_glue = 's3://' + s3_bucket_glue + '/' + s3_prefix_glue
# Desired time range
start_time = datetime.now() + timedelta(days=0)
end_time = datetime.now() + timedelta(days=2)
# Get Glue table metadata
table_data = get_current_schema(table_name, database_name)
# Generate partition list of dicts
data = generate_partition_input_list(start_time, end_time, s3_input_glue, table_data)
# Batch insert partitions
create_partitions(data, database_name, table_name)
@raxit65535
Copy link

Thanks for the reply.
I have smaller data as of now. It's small project for educational purpose. But trying to apply all best practices I can.
Your code is completely clear...I was just unclear about you iterate "polygons" in "generate_partition_input_list()" method, what is value part in polygon['polygon_slug']....!
I will probably take route for splitting the .csv into three (multiple) and create Glue partitions using Lambda so that I can refuse to to run expensive Glue Crawler.
Thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment