Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save filipelenfers/9e239699bad7408bc84aef1d49cdf867 to your computer and use it in GitHub Desktop.
Save filipelenfers/9e239699bad7408bc84aef1d49cdf867 to your computer and use it in GitHub Desktop.
Remove all partitions from a Glue Table and add again (sometimes solves the HIVE_PARTITION_SCHEMA_MISMATCH in Athena)
# !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!!
#
# When you get the following error:
# HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced.
# what happened is that some column type changed in the table, and partitions have they own version of the schema,
# and the version that partitions have cached are not compatible with the table actual version on glue.
# We need to:
# 1. remove partitions
# 2. add partitions again
#
# !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!!
import boto3
import time
#Config
PROFILE = 'dev'
DATABASE_NAME = 'default'
TABLE_NAME='my_table'
BATCH=25
ATHENA_WORKGROUP = 'primary'
ATHENA_OUTPUT_LOCATION = 's3://my-result-bucket'
session = boto3.Session(profile_name=PROFILE)
glue_client = session.client("glue")
athena_client = session.client("athena")
# 1. get all partitions
to_delete = []
next_token = None
while True:
params = {
'DatabaseName': DATABASE_NAME,
'TableName': TABLE_NAME
}
if next_token:
params["NextToken"] = next_token
response = glue_client.get_partitions(**params)
partitions = response['Partitions']
to_delete.extend([partition["Values"][0] for partition in partitions])
if "NextToken" in response:
next_token = response["NextToken"]
else:
break
#2. delete partitions
for i in range(0, len(to_delete), BATCH):
batch_to_delete = [{k:[v]} for k,v in zip(["Values"]*BATCH, to_delete[i:i+BATCH])]
print(batch_to_delete)
response = glue_client.batch_delete_partition(
DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
PartitionsToDelete=batch_to_delete)
print()
print(response)
print()
#3. add the partitions again, using MSCK repair. # !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!!
response = athena_client.start_query_execution(
QueryString=f'MSCK REPAIR TABLE {DATABASE_NAME}.{TABLE_NAME};',
QueryExecutionContext={
'Database': DATABASE_NAME
},
ResultConfiguration={
'OutputLocation': ATHENA_OUTPUT_LOCATION
},
WorkGroup=ATHENA_WORKGROUP
)
query_execution_id = response['QueryExecutionId']
while True:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
if response['QueryExecution']['Status']['State'] in ['QUEUED','RUNNING']:
print("Waiting MSCK REPAIR to complete." )
time.sleep(5)
else:
print()
print(response)
print()
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment