Skip to content

Instantly share code, notes, and snippets.

@mjepronk
Created April 7, 2020 15:15
Show Gist options
  • Save mjepronk/eb103fc4fb3c9bda30954992d1ff8cf1 to your computer and use it in GitHub Desktop.
Save mjepronk/eb103fc4fb3c9bda30954992d1ff8cf1 to your computer and use it in GitHub Desktop.
Dump PostgreSQL database to a file, import a database from a file, or copy from one database to another one.
#!/usr/bin/env python3
import os
import sys
import shutil
import argparse
import getpass
import datetime
import subprocess
PSQL_BIN = "/usr/bin/psql"
PGDUMP_BIN = "/usr/bin/pg_dump"
CREATEDB_BIN = "/usr/bin/createdb"
DROPDB_BIN = "/usr/bin/dropdb"
TMP_DIR = "/var/tmp"
def gettmpfile(db, tmpdir):
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
return os.path.join(tmpdir, "{0}-{1}.sql".format(db, timestamp))
def dbdump(db, dumpfile):
cmd = "{0} -E UTF8 --no-owner {1} > {2}".format(PGDUMP_BIN, db, dumpfile)
subprocess.check_call(cmd, shell=True)
def dbimport(db, dumpfile):
cmd = "{0} {1} < {2}".format(PSQL_BIN, db, dumpfile)
subprocess.check_call(cmd, shell=True)
def dbcreate(db, owner):
cmd = "{0} -O {1} -E UTF-8 {2}".format(CREATEDB_BIN, owner, db)
subprocess.check_call(cmd, shell=True)
def dbdrop(db):
dbdisconnectusers(db)
cmd = "%s %s" % (DROPDB_BIN, db)
subprocess.check_call(cmd, shell=True)
def dbquery(query, db=None):
cmd = [PSQL_BIN, '-t', '-A', '-c', query]
if db:
cmd.extend(['-d', db])
return subprocess.check_output(cmd)
def dbexists(db):
query = "SELECT 1 FROM pg_database WHERE datname = '{0}';".format(db)
return int(dbquery(query).strip() or 0) == 1
def dbroleexists(role):
query = "SELECT 1 FROM pg_roles WHERE rolname = '{0}';".format(role)
return int(dbquery(query).strip() or 0) == 1
def dbdisconnectusers(db):
# This doesn't seem to work...
query = "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE datname = '{0}' AND pid <> pg_backend_pid();".format(db)
return dbquery(query)
def dump_database_to_file(srcdb):
# Dump the source database
sourcedump = gettmpfile(srcdb, TMP_DIR)
dbdump(srcdb, sourcedump)
return sourcedump
def import_database_from_file(sourcedump, destdb, destowner):
destdump = None
if dbexists(destdb):
# Make a backup of the destination database before we nuke it
destdump = gettmpfile(destdb, TMP_DIR)
dbdump(destdb, destdump)
dbdrop(destdb)
# Create empty destination database and import dump file
dbcreate(destdb, destowner)
dbimport(destdb, sourcedump)
# Change the owner of the database objects
current_role = dbquery("SELECT current_user").strip().decode('utf-8')
if current_role != destowner:
query = 'REASSIGN OWNED BY "{0}" TO "{1}";'.format(current_role, destowner)
dbquery(query, destdb)
return destdump
def help():
print('Usage: copydb.py SOURCE_DATABASE DEST_DATABASE')
sys.exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
src_group = parser.add_mutually_exclusive_group(required=True)
src_group.add_argument('--from-db', help='from database')
src_group.add_argument('--from-file', help='from SQL file')
dest_group = parser.add_mutually_exclusive_group(required=True)
dest_group.add_argument('--to-db', help='to database')
dest_group.add_argument('--to-file', help='to SQL file')
args = parser.parse_args()
if args.from_file and args.to_file:
print("Are you joking? This is not cp.")
sys.exit(1)
PGSQL_USERNAME = os.environ.get('PGUSER', os.environ['USER'])
PGSQL_PASSWORD = getpass.getpass(
"Please provide password for PostgreSQL user `{0}': ".format(
PGSQL_USERNAME))
os.environ["PGUSER"] = PGSQL_USERNAME
os.environ["PGPASSWORD"] = PGSQL_PASSWORD
try:
dbquery("SELECT current_user")
except subprocess.CalledProcessError as exception:
if exception.returncode == 2:
print("Could not connect to database, is your password OK?")
sys.exit(0)
dumpfiles = []
if args.from_db:
if not dbexists(args.from_db):
print("Source database `%s' does not exist." % args.from_db)
sys.exit(1)
sourcedump = dump_database_to_file(args.from_db)
dumpfiles.append(sourcedump)
elif args.from_file:
if not os.path.exists(args.from_file):
print("Source file '%s' does not exist." % args.from_file)
sys.exit(1)
sourcedump = args.from_file
if args.to_db:
db_exists = dbexists(args.to_db)
role_exists = dbroleexists(args.to_db)
destowner = args.to_db if role_exists else PGSQL_USERNAME
if not db_exists or not role_exists:
if not db_exists:
print("Destination database `%s' does not exist." % args.to_db)
if not role_exists:
print("Role `%s' does not exist, using `%s'." % (args.to_db, destowner))
result = input('Continue? (Y/N) ')
if not result.upper().startswith('Y'):
print("Abort.")
sys.exit(1)
destdump = import_database_from_file(sourcedump, args.to_db, destowner)
if destdump:
dumpfiles.append(destdump)
elif args.to_file:
shutil.copy(sourcedump, args.to_file)
if len(dumpfiles) >= 1:
print ("Please remove the temporary database dump files (%s) if you're happy with the results of this script." % ', '.join(dumpfiles))
del os.environ["PGPASSWORD"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment