Skip to content

Instantly share code, notes, and snippets.

@peterwj
Created January 27, 2022 22:04
Show Gist options
  • Save peterwj/0614bf6b6fe339a3cbd42eb93dc5b37a to your computer and use it in GitHub Desktop.
Save peterwj/0614bf6b6fe339a3cbd42eb93dc5b37a to your computer and use it in GitHub Desktop.
Glue code to quickly copy data from one Postgres table to another
#!/usr/bin/env python3
import click
import queue
import sys
import os
import threading
q = queue.Queue()
EXPECTED_DEST_DB_HOST = "host=ur-db.postgres.database"
@click.command()
@click.option("--table-name", help="Name of table to migrate", required=True)
@click.option(
"--threads",
"-j",
help="Number of threads to run in parallel",
type=int,
)
@click.option(
"--table-size", help="Number of rows in the table", type=int, required=True
)
@click.option("--source-url", help="URL of the source database", required=True)
@click.option("--dest-url", help="URL of the source database", required=True)
@click.option("--dry-run/--no-dry-run", default=True, help="Is this a dry run")
@click.option("--batch-size", default=1000, type=int)
@click.option("--start", default=0, type=int, help="Lowest ID to migrate")
def migrate_table(
dest_url: str,
source_url: str,
table_name: str,
threads: int,
table_size: int,
dry_run: bool,
batch_size: int,
start: int,
) -> None:
"""Copy the specified table from the database at source_url to the database at
dest_url. The copy happens in parallel (according to the value of the --threads
param) using os.system.
"""
if EXPECTED_DEST_DB_HOST not in dest_url:
print("Error: dest database is unexpected")
sys.exit(1)
interval = batch_size
end = start + interval
while start < table_size:
task = {
"dry_run": dry_run,
"table_name": table_name,
"source_url": source_url,
"dest_url": dest_url,
"start": start,
"end": end,
}
q.put(task)
start = end
end += interval
print("starting workers")
for _ in range(threads):
threading.Thread(target=worker, daemon=True).start()
print("waiting..")
q.join()
print("done")
def worker():
print("booting a worker")
while True:
item = q.get()
dry_run = item["dry_run"]
table_name = item["table_name"]
source_url = item["source_url"]
dest_url = item["dest_url"]
start = item["start"]
end = item["end"]
if EXPECTED_DEST_DB_HOST not in dest_url:
print("Error: dest database is unexpected")
return
select_query = f"COPY (SELECT * FROM {table_name} WHERE id >= {start} AND id < {end}) TO STDOUT"
read_command = f'psql "{source_url}" -c "{select_query}"'
write_command = f'psql "{dest_url}" -c "COPY {table_name} FROM STDIN"'
command = f"PGOPTIONS='--statement-timeout=0' {read_command} | PGOPTIONS='--statement-timeout=0' {write_command}"
if dry_run:
print(f"would run:\n\t{command}")
else:
os.system(command)
print(f"migrated [{start}, {end})")
q.task_done()
if __name__ == "__main__":
migrate_table()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment