Skip to content

Instantly share code, notes, and snippets.

@take-five
Last active April 1, 2024 22:55
Show Gist options
  • Save take-five/1dab3a99c8636a93fc69f36ff9530b11 to your computer and use it in GitHub Desktop.
Save take-five/1dab3a99c8636a93fc69f36ff9530b11 to your computer and use it in GitHub Desktop.
Fast logical replication initializer for PostgreSQL

Copyright 2020 Glia Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#!/usr/bin/env python3
"""
Helper script to create logical replica from standby-server
using combination of manually created logical replication slot
and pg_dump. In comparison to "normal" logical replication setup
this script has the following advantages:
1) Initial snapshot is loaded with pg_dump+pg_restore, which should
be much faster than initial copying of entire database via native
logical replication.
2) pg_dump is created on hot-standby, this will make sure that pg_dump
won't interfere with normal database operation
Requirements:
- This script can be run on any host with installed python3, psycopg2 and
postgresql-client package (to run pg_dump and pg_restore).
- Replication user must be created before running this script.
Example:
setup_logical_replication.py \
-s logical_repl_sub \
-p logical_repl_pub \
--primary-conn-str "host=10.0.0.1 port=5432 dbname=mydb user=repuser password=Secret" \
--secondary-conn-str "host=10.0.0.2 port=5432 dbname=mydb user=repuser password=Secret" \
--target-conn-str "host=10.1.0.100 port=5432 dbname=mytargetdb user=adminuser password=1234"
--dump-dir /data/dump
Inspired by https://www.postgresql-archive.org/logical-replication-initiate-via-manual-pg-dump-td6085971.html#a6086220
"""
import optparse
import logging
import sys
import os
import subprocess
import shlex
import shutil
import psycopg2
from psycopg2 import sql
from contextlib import contextmanager
class LogicalReplicationInitializer:
def __init__(self, args):
# parse command line arguments
parser = self.init_optparse()
options, self.args = parser.parse_args(args)
if options.primary_conn_str:
self.primary_conn_str = options.primary_conn_str
else:
parser.error("--primary-conn-str is missing")
if options.secondary_conn_str:
self.secondary_conn_str = options.secondary_conn_str
else:
self.secondary_conn_str = self.primary_conn_str
if options.target_conn_str:
self.target_conn_str = options.target_conn_str
else:
parser.error("--target-conn-str is missing")
if options.subscription_conn_str:
self.subscription_conn_str = options.subscription_conn_str
else:
self.subscription_conn_str = self.primary_conn_str
if options.dump_dir:
self.dump_dir = options.dump_dir
else:
parser.error("--dump-dir is missing")
self.slot = options.slot_name
self.publication_name = options.publication_name
logging.basicConfig(level=logging.INFO,
format='[%(asctime)s] %(levelname)s %(message)s')
self.log = logging.getLogger()
def init_optparse(self):
parser = optparse.OptionParser()
parser.add_option("-s",
"--slot-name",
dest="slot_name",
default="migration_sub",
help="Replication slot name")
parser.add_option("-p",
"--publication-name",
dest="publication_name",
default="migration_pub",
help="Publication name")
parser.add_option("-P",
"--primary-conn-str",
dest="primary_conn_str",
help="Primary DB connection string")
parser.add_option("-S",
"--secondary-conn-str",
dest="secondary_conn_str",
help="Secondary DB connection string")
parser.add_option("-T",
"--target-conn-str",
dest="target_conn_str",
help="Target DB connection string")
parser.add_option("-C",
"--subscription-conn-str",
dest="subscription_conn_str",
help="Primary DB connection string to be used on the target DB")
parser.add_option("-d",
"--dump-dir",
dest="dump_dir",
help="pg_dump target directory")
return parser
def start(self):
try:
self.connect()
# Drop existing subscription if previous attempt failed
self.drop_subscription()
self.create_publication()
self.create_logical_replication_slot()
# Export snapshot in the separate transaction and while
# it's open, dump the database
with self.with_snapshot() as (lsn, snapshot):
self.dump(snapshot)
self.restore_dump()
self.create_subscription()
self.advance_subscription_origin(lsn)
self.enable_subscription()
except Exception as err:
self.log.error("%s", err)
sys.exit(1)
def connect(self):
self.primary = psycopg2.connect(self.primary_conn_str)
self.secondary = psycopg2.connect(self.secondary_conn_str)
self.target = psycopg2.connect(self.target_conn_str)
def create_publication(self):
with self.primary.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM pg_publication WHERE pubname = %s", (self.publication_name,))
(cnt,) = cur.fetchone()
if cnt == 0:
pub_name = sql.Identifier(self.publication_name)
query = sql.SQL("CREATE PUBLICATION {} FOR ALL TABLES").format(pub_name)
cur.execute(query)
self.log.info("Created publication %s" % self.publication_name)
else:
self.log.info("Publication %s already exists" % self.publication_name)
self.primary.commit()
def create_logical_replication_slot(self):
with self.primary.cursor() as cur:
cur.execute("""
SELECT pg_drop_replication_slot(slot_name)
FROM pg_replication_slots
WHERE slot_name = %s
""", (self.slot,))
cur.execute(
"SELECT pg_create_logical_replication_slot(%s, %s)",
(self.slot, "pgoutput")
)
(res,) = cur.fetchone()
self.primary.commit()
self.log.info("Created logical replication slot %s" % res)
@contextmanager
def with_snapshot(self):
self.secondary.set_session(isolation_level='REPEATABLE READ')
with self.secondary.cursor() as cur:
cur.execute("""
SELECT
CASE
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
END,
pg_export_snapshot()
""")
yield cur.fetchone()
def dump(self, snapshot):
self.log.info("Dumping source DB")
subprocess.run([
"pg_dump",
"--no-publication",
"--no-subscription",
"--snapshot=" + snapshot,
"--format=d",
"--jobs=4",
"-f", self.dump_dir,
self.secondary_conn_str
], check=True)
def restore_dump(self):
self.log.info("Restoring the dump on the target DB")
subprocess.run([
"pg_restore",
"--clean",
"--if-exists",
"-Fd",
"--jobs=4",
"-d", self.target_conn_str,
self.dump_dir
])
def create_subscription(self):
self.log.info("Creating subscription %s" % self.slot)
with self.target.cursor() as cur:
pub_name = sql.Identifier(self.publication_name)
sub_name = sql.Identifier(self.slot)
query = sql.SQL("""
CREATE SUBSCRIPTION {}
CONNECTION %s
PUBLICATION {}
WITH (create_slot=false, slot_name={}, enabled=false, copy_data=false)
""").format(sub_name, pub_name, sub_name)
cur.execute(query, (self.subscription_conn_str,))
self.target.commit()
def advance_subscription_origin(self, lsn):
self.log.info("Setting replication origin position to %s" % lsn)
with self.target.cursor() as cur:
cur.execute("""
SELECT
pg_replication_origin_advance('pg_' || subid::text, %s)
FROM
pg_stat_subscription pss
WHERE
pss.subname = %s
""", (lsn, self.slot))
self.target.commit()
def enable_subscription(self):
self.log.info("Enabling subscription %s" % self.slot)
with self.target.cursor() as cur:
sub_name = sql.Identifier(self.slot)
query = sql.SQL("ALTER SUBSCRIPTION {} ENABLE").format(sub_name)
cur.execute(query)
self.target.commit()
def drop_subscription(self):
self.target.autocommit = True
with self.target.cursor() as cur:
sub_name = sql.Identifier(self.slot)
query = sql.SQL("DROP SUBSCRIPTION IF EXISTS {}").format(sub_name)
cur.execute(query)
self.target.autocommit = False
if __name__ == "__main__":
script = LogicalReplicationInitializer(sys.argv[1:])
script.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment