Created
January 12, 2015 14:53
-
-
Save paveldedik/c3d8ca01bfc8eb3f8db5 to your computer and use it in GitHub Desktop.
Tool for partitioning very large Mergado tables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
python partitioning.py -c \ | |
"host=localhost dbname=mergado3 password=development user=dev" | |
-p public.stats_heureka -t id -i 100 | |
""" | |
import logging | |
import psycopg2 | |
import partition_data as partman | |
logging.basicConfig( | |
filename='output_partitioning.log', | |
level=logging.DEBUG, | |
format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', | |
) | |
VACUUM_SQL = """VACUUM ANALYZE {0};""" | |
REINDEX_SQL = """REINDEX TABLE {0};""" | |
PARTITION_SQL = """SELECT partman.create_parent(%s, %s, %s, %s, NULL, %s);""" | |
TABLES_SQL = """SELECT table_name FROM information_schema.tables | |
WHERE table_schema=%s;""" | |
PARTITION_PREMAKE = 2 | |
PARTITION_INTERVAL = 20000 | |
def create_conn(connection_args): | |
conn = psycopg2.connect(connection_args) | |
conn.autocommit = True | |
return conn | |
def close_conn(conn): | |
conn.close() | |
def get_tables(conn, config): | |
cursor = conn.cursor() | |
schema, table = config.parent.split('.', 2) | |
cursor.execute(TABLES_SQL, (schema,)) | |
for table_name, in cursor.fetchall(): | |
if table_name.startswith(table): | |
yield table_name | |
def vacuum_tables(conn, config): | |
cursor = conn.cursor() | |
for table in get_tables(conn, config): | |
logging.info('Vacuuming table %s.', table) | |
cursor.execute(VACUUM_SQL.format(table)) | |
def reindex_tables(conn, config): | |
cursor = conn.cursor() | |
for table in get_tables(conn, config): | |
logging.info('Reindexing table %s.', table) | |
cursor.execute(REINDEX_SQL.format(table)) | |
def create_partitions(conn, config): | |
cursor = conn.cursor() | |
premake = str(PARTITION_PREMAKE) | |
interval = str(PARTITION_INTERVAL) | |
sql_args = (config.parent, 'set_id', 'id-static', interval, premake) | |
cursor.execute(PARTITION_SQL, sql_args) | |
def distribute_data(conn, config): | |
partman_schema = partman.get_partman_schema(conn) | |
if not config.autovacuum_on: | |
partman.turn_off_autovacuum(conn, partman_schema) | |
total = partman.partition_data(conn, partman_schema) | |
if not config.quiet: | |
logging.info("Total rows moved: %d.", total) | |
partman.vacuum_parent(conn) | |
if not config.autovacuum_on: | |
partman.reset_autovacuum(conn, partman_schema) | |
def main(): | |
logging.info('Partitioning started.') | |
config = partman.args | |
conn = create_conn(config.connection) | |
logging.info('Creating partitions.') | |
create_partitions(conn, config) | |
logging.info('Distributing data.') | |
distribute_data(conn, config) | |
logging.info('Reindexing tables.') | |
reindex_tables(conn, config) | |
logging.info('Vacuuming tables.') | |
vacuum_tables(conn, config) | |
close_conn(conn) | |
logging.info('Partitioning finished.') | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
logging.exception(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment