Skip to content

Instantly share code, notes, and snippets.

@ferrouswheel
Forked from JoaoCarabetta/README.md
Created April 24, 2020 01:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ferrouswheel/8f81aa96082cf0e96e16a420f9ea6270 to your computer and use it in GitHub Desktop.
Save ferrouswheel/8f81aa96082cf0e96e16a420f9ea6270 to your computer and use it in GitHub Desktop.
Add Glue Partitions with Lambda AWS
import json
import boto3
from dateutil import rrule
from datetime import datetime, timedelta
glue = boto3.client('glue', '--') # Update with your location
s3 = boto3.client('s3')
def get_current_schema(table_name, database_name):
response = glue.get_table(
DatabaseName=database_name,
Name=table_name)
table_data = {}
table_data['input_format'] = response['Table']['StorageDescriptor']['InputFormat']
table_data['output_format'] = response['Table']['StorageDescriptor']['OutputFormat']
table_data['table_location'] = response['Table']['StorageDescriptor']['Location']
table_data['serde_info'] = response['Table']['StorageDescriptor']['SerdeInfo']
table_data['partition_keys'] = response['Table']['PartitionKeys']
return table_data
def create_partitions(data, database_name, table_name):
break_list_in_chunks = lambda data, chunk: [data[x:x+chunk] for x in range(0, len(data), chunk)]
for i, data in enumerate(break_list_in_chunks(data, 100)):
print(i)
reate_partition_response = glue.batch_create_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionInputList=data
)
def generate_partition_input(year, month, day, hour, polygon_slug, s3_input,
table_data):
part_location = "{}/year={}/month={}/day={}/hour={}/polygon_slug={}".format(s3_input, year, month, day, hour, polygon_slug)
input_dict = {
'Values': [
year, month, day, hour, polygon_slug
],
'StorageDescriptor': {
'Location': part_location,
'InputFormat': table_data['input_format'],
'OutputFormat': table_data['output_format'],
'SerdeInfo': table_data['serde_info']
}
}
return input_dict
def generate_partition_input_list(start, end, s3_input, polygons, table_data):
input_list = []
for date in rrule.rrule(rrule.HOURLY, dtstart=start, until=end):
year = str(date.year)
month = str(date.month)
day = str(date.day)
hour = str(date.hour)
for polygon in polygons:
input_list.append(generate_partition_input(year, month, day, hour, polygon['polygon_slug'], s3_input,
table_data))
return input_list
def read_polygon(bucket, key):
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
payload = json.loads(response['Body'].read().decode('utf-8'))
return payload['polygons']
def lambda_handler(event, context):
# Glue table location
database_name = 'test' # update
table_name = 'raw_data_test' # update
# S3 info location
s3_buckcet_glue = '--' # update
s3_prefix_glue = '--' # update
s3_input_glue = 's3://' + s3_buckcet_glue + '/' + s3_prefix_glue
# Support polygons index
s3_buckcet_polygon = '--' # update
s3_key_polygon = '--.json' # update
# Desired time range
start_time = datetime.now() + timedelta(days=0)
end_time = datetime.now() + timedelta(days=2)
# Get polygons
polygons = read_polygon(s3_buckcet_polygon, s3_key_polygon)
# Get Glue table metadata
table_data = get_current_schema(table_name, database_name)
# Generate partition list of dicts
data = generate_partition_input_list(start_time, end_time, s3_input_glue,
polygons, table_data)
# Batch insert partitions
create_partitions(data, database_name, table_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment