Skip to content

Instantly share code, notes, and snippets.

@dharamsk
Created October 17, 2018 05:40
Show Gist options
  • Save dharamsk/ce66dc8ce211d3436034bec4a37e4be4 to your computer and use it in GitHub Desktop.
Save dharamsk/ce66dc8ce211d3436034bec4a37e4be4 to your computer and use it in GitHub Desktop.
Redshift: Programmatically configure dist style and sort key of *existing* tables
# I wrote this script to alter 700 redshift tables to diststyle all (and remove sort keys)
# but it is partially setup to specify any dist style and sort key on a table by table basis
# all that's needed is to modify the main() function to accept a dict with the table configs
# Redshift clusters with large node types will waste disk space and network bandwidth
# when small tables use EVEN or DISTKEY dist styles
# sort keys will double the minimum size of a table, also wasting space
# see here for more on minimum table size calculation:
# https://aws.amazon.com/premiumsupport/knowledge-center/redshift-cluster-storage-space/
# KNOWN ISSUE: if table name is reserved (e.g. region) it will get double quoted and break this script
# KNOWN ISSUE: if there are views or table constraints that depend on the table, this will error
# admin table views were created from here:
# https://github.com/awslabs/amazon-redshift-utils/tree/master/src/AdminViews
# happy scripting, use at your own risk, etc etc
import sqlalchemy
import os
import time
OLD_TABLE_SUFFIX = '__autostyled_20181013'
CREATE_SQL = """
select ddl from admin.v_generate_tbl_ddl v
where v.tablename = '{table}'
and v.schemaname = '{schema}'
;
"""
PERMISSION_SQL = """
select ddl from admin.v_generate_user_grant_revoke_ddl
where objtype != 'Function'
and schemaname = '{schema}'
and ddltype = 'grant'
and objname = '{table}'
;
"""
OWNER_SQL = """
select tableowner
from pg_tables
where tablename = '{table}'
and schemaname = '{schema}'
"""
NEW_OWNER_SQL = "ALTER TABLE {new_table_name} OWNER TO \"{owner}\";"
INSERT_SQL = """
insert into {new_table_name}
select * from {full_table_name}
-- select distinct * if you want to dedupe while you're at it,
-- but that will mess with the validation sql results below
;
"""
RENAME_SQL = """
BEGIN;
alter table {full_table_name} rename to {table}{suffix};
alter table {new_table_name} rename to {table};
END;
ANALYZE {full_table_name};
"""
VALIDATION_SQL = """
select b.new >= a.old
from (select count(*) as old from {full_table_name}{suffix}) a
, (select count(*) as new from {full_table_name}) b
;
"""
DROP_SQL = """
drop table {old_table_with_suffix}
;
"""
CONN_URI = os.environ.get('YOUR_SPECIAL_CONN_URI_VARIABLE_NAME')
def get_engine(CONN_URI):
engine = sqlalchemy.create_engine(CONN_URI)
return engine.connect()
def get_prefix():
return '{}{}'.format('n_', str(int(time.time())))
def get_create_sql(table, schema,
diststyle, distkey, sortkey,
new_table_name):
# returns a sql command string to create the new configured table
# get original DDL
q = CREATE_SQL.format(table=table, schema=schema)
results = execute(q)
results = [str(t[0]) for t in results]
# make sure the result generally makes sense
if len(results) < 2:
print(results)
raise Exception('bad query results')
# strip off the old dist/sort options
i = results.index(')')
results = results[:i+1]
# add diststyle or distkey
if diststyle in ('ALL', 'EVEN'):
dist = 'DISTSTYLE {}'.format(diststyle)
elif distkey:
dist = 'DISTKEY ({})'.format(distkey)
results.append(dist)
# add sort key
if sortkey:
results.append('SORTKEY ({})'.format(sortkey))
results.append(';')
# update table name in ddl
full_table_name = '{}.{}'.format(schema, table)
mapped_results = list(map(lambda x: x.replace(
full_table_name, new_table_name), results))
if mapped_results == results:
raise Exception('the mapped results didnt change')
# return the string of the sql
return '\n'.join(mapped_results).replace('--DROP TABLE', 'DROP TABLE IF EXISTS')
def get_permissions_sql(table,
schema,
new_table_name):
q = PERMISSION_SQL.format(table=table, schema=schema)
results = execute(q)
list_results = [str(t[0]) for t in results]
full_table_name = '{}.{}'.format(schema, table)
mapped_list_results = list(map(lambda x: x.replace(
full_table_name, new_table_name), list_results))
q2 = OWNER_SQL.format(schema=schema, table=table)
owner = execute(q2).fetchone()[0]
update_owner = NEW_OWNER_SQL.format(
new_table_name=new_table_name,
owner=owner)
mapped_list_results.insert(0, update_owner)
return '\n'.join(mapped_list_results)
def execute(command):
with get_engine(CONN_URI) as conn:
return conn.execute(command)
def alter_table_attributes(
table,
schema,
diststyle='ALL',
distkey=None,
sortkey=None):
new_table_prefix = get_prefix()
full_table_name = '{}.{}'.format(schema, table)
wont_work_table_name = '{}."{}"'.format(schema, table)
new_table_name = '{}.{}_{}'.format(schema, new_table_prefix, table)
# Get all the commands
create_command = get_create_sql(
table=table,
schema=schema,
diststyle=diststyle,
distkey=distkey,
sortkey=sortkey,
new_table_name=new_table_name)
permissions_command = get_permissions_sql(
table=table,
schema=schema,
new_table_name=new_table_name)
copy_command = INSERT_SQL.format(full_table_name=full_table_name,
new_table_name=new_table_name)
rename_command = RENAME_SQL.format(full_table_name=full_table_name,
new_table_name=new_table_name,
table=table,
suffix=OLD_TABLE_SUFFIX)
validation_command = VALIDATION_SQL.format(full_table_name=full_table_name,
suffix=OLD_TABLE_SUFFIX)
old_table_with_suffix = '{}{}'.format(full_table_name, OLD_TABLE_SUFFIX)
drop_command = DROP_SQL.format(old_table_with_suffix=old_table_with_suffix)
# print(create_command)
# print(permissions_command)
# print(copy_command)
# print(rename_command)
# print(validation_command)
# print(drop_command)
if wont_work_table_name in create_command or full_table_name in create_command:
print('-------------------- .............. >.>>>>>>>>>>>>>>>>>> SKIPPING TABLE AS TABLE NAME IS RESERVED ##################################')
print(create_command)
print(permissions_command)
print(copy_command)
print(rename_command)
print(validation_command)
print(drop_command)
return
print('Creating Table {}'.format(new_table_name))
start_time = time.time()
execute(create_command)
compl_time = "--- %s seconds ---" % (time.time() - start_time)
print('Table Created in ' + compl_time)
print('Fixing Perms on table {}'.format(new_table_name))
start_time = time.time()
try:
execute(permissions_command)
except:
print('EXCEPTION ----------------------------- >>>>>>>> create statement failed')
print(create_command)
compl_time = "--- %s seconds ---" % (time.time() - start_time)
print('Perms Fixed in ' + compl_time)
start_time = time.time()
try:
execute(copy_command)
compl_time = "--- %s seconds ---" % (time.time() - start_time)
print('Rows Inserted in ' + compl_time)
except:
execute('DROP TABLE IF EXISTS {}'.format(new_table_name))
print('Insert failed, reverting and moving on..')
return
start_time = time.time()
execute(rename_command)
compl_time = "--- %s seconds ---" % (time.time() - start_time)
print('Tables Renamed in ' + compl_time)
valid = execute(validation_command).first()[0]
if not valid:
raise Exception('table row count mismatch')
else:
start_time = time.time()
execute(drop_command)
compl_time = "--- %s seconds ---" % (time.time() - start_time)
print('Old Table Dropped in ' + compl_time)
def main(current_list):
z = 0
for t in current_list:
print('Table ' + str(z+1) + ': ' + t)
print(time.strftime("%Y-%m-%d %H:%M:%S"))
z += 1
lstr = t.split('.')
if len(lstr) != 2:
print(lstr)
raise Exception('table name error')
schema = lstr[0]
table = lstr[1]
diststyle = 'ALL' # I only used this to remove sort keys and change diststyle to ALL
alter_table_attributes(
table,
schema,
diststyle=diststyle)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment