Skip to content

Instantly share code, notes, and snippets.

@paveldedik
Created January 12, 2015 14:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paveldedik/c3d8ca01bfc8eb3f8db5 to your computer and use it in GitHub Desktop.
Save paveldedik/c3d8ca01bfc8eb3f8db5 to your computer and use it in GitHub Desktop.
Tool for partitioning very large Mergado tables.
# -*- coding: utf-8 -*-
"""
python partitioning.py -c \
"host=localhost dbname=mergado3 password=development user=dev"
-p public.stats_heureka -t id -i 100
"""
import logging
import psycopg2
import partition_data as partman
logging.basicConfig(
filename='output_partitioning.log',
level=logging.DEBUG,
format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
)
VACUUM_SQL = """VACUUM ANALYZE {0};"""
REINDEX_SQL = """REINDEX TABLE {0};"""
PARTITION_SQL = """SELECT partman.create_parent(%s, %s, %s, %s, NULL, %s);"""
TABLES_SQL = """SELECT table_name FROM information_schema.tables
WHERE table_schema=%s;"""
PARTITION_PREMAKE = 2
PARTITION_INTERVAL = 20000
def create_conn(connection_args):
conn = psycopg2.connect(connection_args)
conn.autocommit = True
return conn
def close_conn(conn):
conn.close()
def get_tables(conn, config):
cursor = conn.cursor()
schema, table = config.parent.split('.', 2)
cursor.execute(TABLES_SQL, (schema,))
for table_name, in cursor.fetchall():
if table_name.startswith(table):
yield table_name
def vacuum_tables(conn, config):
cursor = conn.cursor()
for table in get_tables(conn, config):
logging.info('Vacuuming table %s.', table)
cursor.execute(VACUUM_SQL.format(table))
def reindex_tables(conn, config):
cursor = conn.cursor()
for table in get_tables(conn, config):
logging.info('Reindexing table %s.', table)
cursor.execute(REINDEX_SQL.format(table))
def create_partitions(conn, config):
cursor = conn.cursor()
premake = str(PARTITION_PREMAKE)
interval = str(PARTITION_INTERVAL)
sql_args = (config.parent, 'set_id', 'id-static', interval, premake)
cursor.execute(PARTITION_SQL, sql_args)
def distribute_data(conn, config):
partman_schema = partman.get_partman_schema(conn)
if not config.autovacuum_on:
partman.turn_off_autovacuum(conn, partman_schema)
total = partman.partition_data(conn, partman_schema)
if not config.quiet:
logging.info("Total rows moved: %d.", total)
partman.vacuum_parent(conn)
if not config.autovacuum_on:
partman.reset_autovacuum(conn, partman_schema)
def main():
logging.info('Partitioning started.')
config = partman.args
conn = create_conn(config.connection)
logging.info('Creating partitions.')
create_partitions(conn, config)
logging.info('Distributing data.')
distribute_data(conn, config)
logging.info('Reindexing tables.')
reindex_tables(conn, config)
logging.info('Vacuuming tables.')
vacuum_tables(conn, config)
close_conn(conn)
logging.info('Partitioning finished.')
if __name__ == '__main__':
try:
main()
except Exception as e:
logging.exception(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment