Skip to content

Instantly share code, notes, and snippets.

@JoaoCarabetta
Last active February 3, 2022 16:05
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save JoaoCarabetta/4634ab21791247a1308c707552deaaca to your computer and use it in GitHub Desktop.
Save JoaoCarabetta/4634ab21791247a1308c707552deaaca to your computer and use it in GitHub Desktop.
Add Time Based Glue Partitions with Lambda AWS

Creates time based Glue partitions given time range.

Keep in mind that you don't need data to add partitions. So, you can create partitions for a whole year and add the data to S3 later.

import json
import boto3
from dateutil import rrule
from datetime import datetime, timedelta
glue = boto3.client('glue', '--') # Update with your location
s3 = boto3.client('s3')
def get_current_schema(table_name, database_name):
response = glue.get_table(
DatabaseName=database_name,
Name=table_name)
table_data = {}
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat']
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat']
table_data['table_location'] = response['Table']['StorageDescriptor']['Location']
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo']
table_data['partition_keys'] = response['Table']['PartitionKeys']
return table_data
def create_partitions(data, database_name, table_name):
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)]
for i, data in enumerate(break_list_in_chunks(data, 100)):
print(i)
reate_partition_response = glue.batch_create_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionInputList=data
)
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input,
table_data):
part_location = "{}/year={}/month={}/day={}/hour={}".format(s3_input, year, month, day, hour)
input_dict = {
'Values': [
year, month, day, hour
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': table_data['input_format'],
'OutputFormat': table_data['output_format'],
'SerdeInfo': table_data['serde_info']
}
}
return input_dict
def generate_partition_input_list(start, end, s3_input, polygons, table_data):
input_list = []
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end):
year = str(date.year)
month = str(date.month)
day = str(date.day)
hour = str(date.hour)
input_list.append(generate_partition_input(year, month, day, hour, s3_input, table_data))
return input_list
def lambda_handler(event, context):
# Glue table location
database_name = 'test' # update
table_name = 'raw_data_test' # update
# S3 info location
s3_bucket_glue = '--' # update
s3_prefix_glue = '--' # update
s3_input_glue = 's3://' + s3_bucket_glue + '/' + s3_prefix_glue
# Desired time range
start_time = datetime.now() + timedelta(days=0)
end_time = datetime.now() + timedelta(days=2)
# Get Glue table metadata
table_data = get_current_schema(table_name, database_name)
# Generate partition list of dicts
data = generate_partition_input_list(start_time, end_time, s3_input_glue, table_data)
# Batch insert partitions
create_partitions(data, database_name, table_name)
@raxit65535
Copy link

Thanks for sharing this code. It is quite helpful.
How ever can you give some more idea about, what does "read_polygon()" method do?
what is payload['polygons'] and polygon['polygon_slug'] ?
I am trying to implement similar thing. But I have .csv file in my S3 bucket.
I.e. Suppose I have single .csv file which comes in my S3 bucket everyday. I just need that data available to query via Athena. I already have a Glue catalog table. I will just add partition and put data into that partition.
The issue is, when I have 3 dates (in my .csv) file, it should go into three different partitions on S3.
Can you throw some light?
Thanks !

@JoaoCarabetta
Copy link
Author

Hey, glad that you liked it. I just put it here for my personal use, that is why it is not so clear.

The polygon stuff is related to my specific use case. But, you can replace polygon by id. In the function generate_partition_inputyou can see that polygon is used to map to an s3 path.

About your use case, maybe you don’t need partitions. What is the size of your data? If it is not that big, partitions are just going to annoy you.

If it is big. Then, if you have three dates in your csv, but you are partitioning by date, then you have to split this csv. Each part should go to a different s3 path with propped naming. There is no other way around. You can preprocess the csv using Lambdas or Batch.

@raxit65535
Copy link

Thanks for the reply.
I have smaller data as of now. It's small project for educational purpose. But trying to apply all best practices I can.
Your code is completely clear...I was just unclear about you iterate "polygons" in "generate_partition_input_list()" method, what is value part in polygon['polygon_slug']....!
I will probably take route for splitting the .csv into three (multiple) and create Glue partitions using Lambda so that I can refuse to to run expensive Glue Crawler.
Thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment