Skip to content

Instantly share code, notes, and snippets.

@mbiemann
Created September 9, 2020 04:13
Show Gist options
  • Save mbiemann/4fc92a6fd49a6414e20f05744496a78e to your computer and use it in GitHub Desktop.
Save mbiemann/4fc92a6fd49a6414e20f05744496a78e to your computer and use it in GitHub Desktop.
Python script to run on AWS looking for all Apache Hudi Glue Table and Add Partition via Athena according to S3 files
import json,time
import boto3
athena_output = 's3://aws-athena-query-results-ACCOUNTID-REGION'
athena = boto3.client('athena')
glue = boto3.client('glue')
s3 = boto3.client('s3')
glue_search_next = {}
while glue_search_next != None:
rTables = glue.search_tables(
**{'Filters':[{
'Key': 'InputFormat',
'Value': 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
}]},
**glue_search_next
)
glue_search_next = {'NextToken':rTables['NextToken']} if 'NextToken' in rTables else None
if 'TableList' in rTables:
for srcTable in rTables['TableList']:
# if not ( srcTable['DatabaseName'] == 'databaseName' and srcTable['Name'] == 'tableName' ):
# continue
if srcTable['StorageDescriptor']['InputFormat'] != 'org.apache.hudi.hadoop.HoodieParquetInputFormat':
continue
if srcTable['PartitionKeys'] == []:
continue
database = srcTable['DatabaseName']
table = srcTable['Name']
location = srcTable['StorageDescriptor']['Location']
print(database)
print(table)
print(location)
partKeys = [key['Name'] for key in srcTable['PartitionKeys']]
print(partKeys)
partList = []
glue_getpart_next = {}
while glue_getpart_next != None:
rParts = glue.get_partitions(
**{
'DatabaseName': database,
'TableName': table
},
**glue_getpart_next
)
glue_getpart_next = {'NextToken':rParts['NextToken']} if 'NextToken' in rParts else None
if 'Partitions' in rParts:
for srcPartition in rParts['Partitions']:
if len(partKeys) != len(srcPartition['Values']):
print(partKeys)
print(srcPartition['Values'])
print('ERRO: Partições não coincidem com Valores')
continue
part = []
for i in range(len(partKeys)):
part += [f'''{partKeys[i]}={srcPartition['Values'][i]}''']
partList += ['/'.join(part)]
# print(partList)
locationSplit = location.replace('s3://','').replace('s3a://','').split('/')
bucket = locationSplit[0]
prefix = '/'.join(locationSplit[1:])
prefix = prefix if prefix[-1:] == '/' else prefix + '/'
addPartition = []
s3_list_next = {}
while s3_list_next != None:
rFiles = s3.list_objects_v2(
**{
'Bucket': bucket,
'Prefix': prefix
},
**s3_list_next
)
s3_list_next = {'ContinuationToken':rFiles['NextContinuationToken']} if 'NextContinuationToken' in rFiles else None
if 'Contents' in rFiles:
for srcFile in rFiles['Contents']:
if srcFile['Key'][-8:] != '.parquet':
continue
key = srcFile['Key'].replace(prefix,'')
values = key.split('/')[:-1]
if len(partKeys) != len(values):
print(partKeys)
print(values)
print('ERRO: Partições não coincidem com Valores')
continue
part = '/'.join(values)
if part not in partList:
if part not in addPartition:
print(f' new partition {part}')
addPartition += [part]
partCols = ','.join(part.split('/'))
partCols = partCols.replace('=','=\'').replace(',','\',')+'\''
sql = f\"ALTER TABLE {database}.{table} ADD IF NOT EXISTS PARTITION ({partCols}) LOCATION 's3://{bucket}/{prefix}{part}'\"
print(' '+sql)
count = 0
while True:
count += 1
try:
r = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': athena_output
}
)
queryId = r['QueryExecutionId']
break
except Exception as e:
if 'ThrottlingException' in str(e):
print(str(e))
time.sleep(1*count)
else:
raise Exception(e)
if count == 100:
raise Exception('Too many retries')
count = 0
while True:
count += 1
r = athena.get_query_execution(
QueryExecutionId=queryId
)
status = r['QueryExecution']['Status']['State']
print(' '+status)
if status in ['QUEUED','RUNNING']:
time.sleep(1*count)
else:
if 'StateChangeReason' in r['QueryExecution']['Status']:
print(' '+r['QueryExecution']['Status']['StateChangeReason'])
break
if count == 100:
raise Exception('Too many retries')
print('=================================================')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment