|
#!/usr/bin/env python3 |
|
""" |
|
Helper script to create logical replica from standby-server |
|
using combination of manually created logical replication slot |
|
and pg_dump. In comparison to "normal" logical replication setup |
|
this script has the following advantages: |
|
1) Initial snapshot is loaded with pg_dump+pg_restore, which should |
|
be much faster than initial copying of entire database via native |
|
logical replication. |
|
2) pg_dump is created on hot-standby, this will make sure that pg_dump |
|
won't interfere with normal database operation |
|
|
|
Requirements: |
|
- This script can be run on any host with installed python3, psycopg2 and |
|
postgresql-client package (to run pg_dump and pg_restore). |
|
- Replication user must be created before running this script. |
|
|
|
Example: |
|
|
|
setup_logical_replication.py \ |
|
-s logical_repl_sub \ |
|
-p logical_repl_pub \ |
|
--primary-conn-str "host=10.0.0.1 port=5432 dbname=mydb user=repuser password=Secret" \ |
|
--secondary-conn-str "host=10.0.0.2 port=5432 dbname=mydb user=repuser password=Secret" \ |
|
--target-conn-str "host=10.1.0.100 port=5432 dbname=mytargetdb user=adminuser password=1234" |
|
--dump-dir /data/dump |
|
|
|
Inspired by https://www.postgresql-archive.org/logical-replication-initiate-via-manual-pg-dump-td6085971.html#a6086220 |
|
""" |
|
|
|
import optparse |
|
import logging |
|
import sys |
|
import os |
|
import subprocess |
|
import shlex |
|
import shutil |
|
import psycopg2 |
|
from psycopg2 import sql |
|
from contextlib import contextmanager |
|
|
|
class LogicalReplicationInitializer: |
|
def __init__(self, args): |
|
# parse command line arguments |
|
parser = self.init_optparse() |
|
options, self.args = parser.parse_args(args) |
|
|
|
if options.primary_conn_str: |
|
self.primary_conn_str = options.primary_conn_str |
|
else: |
|
parser.error("--primary-conn-str is missing") |
|
|
|
if options.secondary_conn_str: |
|
self.secondary_conn_str = options.secondary_conn_str |
|
else: |
|
self.secondary_conn_str = self.primary_conn_str |
|
|
|
if options.target_conn_str: |
|
self.target_conn_str = options.target_conn_str |
|
else: |
|
parser.error("--target-conn-str is missing") |
|
|
|
if options.subscription_conn_str: |
|
self.subscription_conn_str = options.subscription_conn_str |
|
else: |
|
self.subscription_conn_str = self.primary_conn_str |
|
|
|
if options.dump_dir: |
|
self.dump_dir = options.dump_dir |
|
else: |
|
parser.error("--dump-dir is missing") |
|
|
|
self.slot = options.slot_name |
|
self.publication_name = options.publication_name |
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='[%(asctime)s] %(levelname)s %(message)s') |
|
|
|
self.log = logging.getLogger() |
|
|
|
def init_optparse(self): |
|
parser = optparse.OptionParser() |
|
parser.add_option("-s", |
|
"--slot-name", |
|
dest="slot_name", |
|
default="migration_sub", |
|
help="Replication slot name") |
|
parser.add_option("-p", |
|
"--publication-name", |
|
dest="publication_name", |
|
default="migration_pub", |
|
help="Publication name") |
|
parser.add_option("-P", |
|
"--primary-conn-str", |
|
dest="primary_conn_str", |
|
help="Primary DB connection string") |
|
parser.add_option("-S", |
|
"--secondary-conn-str", |
|
dest="secondary_conn_str", |
|
help="Secondary DB connection string") |
|
parser.add_option("-T", |
|
"--target-conn-str", |
|
dest="target_conn_str", |
|
help="Target DB connection string") |
|
parser.add_option("-C", |
|
"--subscription-conn-str", |
|
dest="subscription_conn_str", |
|
help="Primary DB connection string to be used on the target DB") |
|
parser.add_option("-d", |
|
"--dump-dir", |
|
dest="dump_dir", |
|
help="pg_dump target directory") |
|
return parser |
|
|
|
def start(self): |
|
try: |
|
self.connect() |
|
# Drop existing subscription if previous attempt failed |
|
self.drop_subscription() |
|
self.create_publication() |
|
self.create_logical_replication_slot() |
|
|
|
# Export snapshot in the separate transaction and while |
|
# it's open, dump the database |
|
with self.with_snapshot() as (lsn, snapshot): |
|
self.dump(snapshot) |
|
|
|
self.restore_dump() |
|
self.create_subscription() |
|
self.advance_subscription_origin(lsn) |
|
self.enable_subscription() |
|
except Exception as err: |
|
self.log.error("%s", err) |
|
sys.exit(1) |
|
|
|
def connect(self): |
|
self.primary = psycopg2.connect(self.primary_conn_str) |
|
self.secondary = psycopg2.connect(self.secondary_conn_str) |
|
self.target = psycopg2.connect(self.target_conn_str) |
|
|
|
def create_publication(self): |
|
with self.primary.cursor() as cur: |
|
cur.execute("SELECT COUNT(*) FROM pg_publication WHERE pubname = %s", (self.publication_name,)) |
|
(cnt,) = cur.fetchone() |
|
|
|
if cnt == 0: |
|
pub_name = sql.Identifier(self.publication_name) |
|
query = sql.SQL("CREATE PUBLICATION {} FOR ALL TABLES").format(pub_name) |
|
|
|
cur.execute(query) |
|
|
|
self.log.info("Created publication %s" % self.publication_name) |
|
else: |
|
self.log.info("Publication %s already exists" % self.publication_name) |
|
|
|
self.primary.commit() |
|
|
|
def create_logical_replication_slot(self): |
|
with self.primary.cursor() as cur: |
|
cur.execute(""" |
|
SELECT pg_drop_replication_slot(slot_name) |
|
FROM pg_replication_slots |
|
WHERE slot_name = %s |
|
""", (self.slot,)) |
|
|
|
cur.execute( |
|
"SELECT pg_create_logical_replication_slot(%s, %s)", |
|
(self.slot, "pgoutput") |
|
) |
|
(res,) = cur.fetchone() |
|
|
|
self.primary.commit() |
|
self.log.info("Created logical replication slot %s" % res) |
|
|
|
|
|
@contextmanager |
|
def with_snapshot(self): |
|
self.secondary.set_session(isolation_level='REPEATABLE READ') |
|
|
|
with self.secondary.cursor() as cur: |
|
cur.execute(""" |
|
SELECT |
|
CASE |
|
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn() |
|
ELSE pg_current_wal_lsn() |
|
END, |
|
pg_export_snapshot() |
|
""") |
|
|
|
yield cur.fetchone() |
|
|
|
def dump(self, snapshot): |
|
self.log.info("Dumping source DB") |
|
|
|
subprocess.run([ |
|
"pg_dump", |
|
"--no-publication", |
|
"--no-subscription", |
|
"--snapshot=" + snapshot, |
|
"--format=d", |
|
"--jobs=4", |
|
"-f", self.dump_dir, |
|
self.secondary_conn_str |
|
], check=True) |
|
|
|
def restore_dump(self): |
|
self.log.info("Restoring the dump on the target DB") |
|
|
|
subprocess.run([ |
|
"pg_restore", |
|
"--clean", |
|
"--if-exists", |
|
"-Fd", |
|
"--jobs=4", |
|
"-d", self.target_conn_str, |
|
self.dump_dir |
|
]) |
|
|
|
def create_subscription(self): |
|
self.log.info("Creating subscription %s" % self.slot) |
|
|
|
with self.target.cursor() as cur: |
|
pub_name = sql.Identifier(self.publication_name) |
|
sub_name = sql.Identifier(self.slot) |
|
|
|
query = sql.SQL(""" |
|
CREATE SUBSCRIPTION {} |
|
CONNECTION %s |
|
PUBLICATION {} |
|
WITH (create_slot=false, slot_name={}, enabled=false, copy_data=false) |
|
""").format(sub_name, pub_name, sub_name) |
|
|
|
cur.execute(query, (self.subscription_conn_str,)) |
|
|
|
self.target.commit() |
|
|
|
def advance_subscription_origin(self, lsn): |
|
self.log.info("Setting replication origin position to %s" % lsn) |
|
|
|
with self.target.cursor() as cur: |
|
cur.execute(""" |
|
SELECT |
|
pg_replication_origin_advance('pg_' || subid::text, %s) |
|
FROM |
|
pg_stat_subscription pss |
|
WHERE |
|
pss.subname = %s |
|
""", (lsn, self.slot)) |
|
|
|
self.target.commit() |
|
|
|
def enable_subscription(self): |
|
self.log.info("Enabling subscription %s" % self.slot) |
|
|
|
with self.target.cursor() as cur: |
|
sub_name = sql.Identifier(self.slot) |
|
query = sql.SQL("ALTER SUBSCRIPTION {} ENABLE").format(sub_name) |
|
cur.execute(query) |
|
|
|
self.target.commit() |
|
|
|
def drop_subscription(self): |
|
self.target.autocommit = True |
|
|
|
with self.target.cursor() as cur: |
|
sub_name = sql.Identifier(self.slot) |
|
query = sql.SQL("DROP SUBSCRIPTION IF EXISTS {}").format(sub_name) |
|
cur.execute(query) |
|
|
|
self.target.autocommit = False |
|
|
|
if __name__ == "__main__": |
|
script = LogicalReplicationInitializer(sys.argv[1:]) |
|
script.start() |