Created
April 7, 2020 15:15
-
-
Save mjepronk/eb103fc4fb3c9bda30954992d1ff8cf1 to your computer and use it in GitHub Desktop.
Dump PostgreSQL database to a file, import a database from a file, or copy from one database to another one.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import shutil | |
import argparse | |
import getpass | |
import datetime | |
import subprocess | |
PSQL_BIN = "/usr/bin/psql" | |
PGDUMP_BIN = "/usr/bin/pg_dump" | |
CREATEDB_BIN = "/usr/bin/createdb" | |
DROPDB_BIN = "/usr/bin/dropdb" | |
TMP_DIR = "/var/tmp" | |
def gettmpfile(db, tmpdir): | |
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') | |
return os.path.join(tmpdir, "{0}-{1}.sql".format(db, timestamp)) | |
def dbdump(db, dumpfile): | |
cmd = "{0} -E UTF8 --no-owner {1} > {2}".format(PGDUMP_BIN, db, dumpfile) | |
subprocess.check_call(cmd, shell=True) | |
def dbimport(db, dumpfile): | |
cmd = "{0} {1} < {2}".format(PSQL_BIN, db, dumpfile) | |
subprocess.check_call(cmd, shell=True) | |
def dbcreate(db, owner): | |
cmd = "{0} -O {1} -E UTF-8 {2}".format(CREATEDB_BIN, owner, db) | |
subprocess.check_call(cmd, shell=True) | |
def dbdrop(db): | |
dbdisconnectusers(db) | |
cmd = "%s %s" % (DROPDB_BIN, db) | |
subprocess.check_call(cmd, shell=True) | |
def dbquery(query, db=None): | |
cmd = [PSQL_BIN, '-t', '-A', '-c', query] | |
if db: | |
cmd.extend(['-d', db]) | |
return subprocess.check_output(cmd) | |
def dbexists(db): | |
query = "SELECT 1 FROM pg_database WHERE datname = '{0}';".format(db) | |
return int(dbquery(query).strip() or 0) == 1 | |
def dbroleexists(role): | |
query = "SELECT 1 FROM pg_roles WHERE rolname = '{0}';".format(role) | |
return int(dbquery(query).strip() or 0) == 1 | |
def dbdisconnectusers(db): | |
# This doesn't seem to work... | |
query = "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE datname = '{0}' AND pid <> pg_backend_pid();".format(db) | |
return dbquery(query) | |
def dump_database_to_file(srcdb): | |
# Dump the source database | |
sourcedump = gettmpfile(srcdb, TMP_DIR) | |
dbdump(srcdb, sourcedump) | |
return sourcedump | |
def import_database_from_file(sourcedump, destdb, destowner): | |
destdump = None | |
if dbexists(destdb): | |
# Make a backup of the destination database before we nuke it | |
destdump = gettmpfile(destdb, TMP_DIR) | |
dbdump(destdb, destdump) | |
dbdrop(destdb) | |
# Create empty destination database and import dump file | |
dbcreate(destdb, destowner) | |
dbimport(destdb, sourcedump) | |
# Change the owner of the database objects | |
current_role = dbquery("SELECT current_user").strip().decode('utf-8') | |
if current_role != destowner: | |
query = 'REASSIGN OWNED BY "{0}" TO "{1}";'.format(current_role, destowner) | |
dbquery(query, destdb) | |
return destdump | |
def help(): | |
print('Usage: copydb.py SOURCE_DATABASE DEST_DATABASE') | |
sys.exit(0) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
src_group = parser.add_mutually_exclusive_group(required=True) | |
src_group.add_argument('--from-db', help='from database') | |
src_group.add_argument('--from-file', help='from SQL file') | |
dest_group = parser.add_mutually_exclusive_group(required=True) | |
dest_group.add_argument('--to-db', help='to database') | |
dest_group.add_argument('--to-file', help='to SQL file') | |
args = parser.parse_args() | |
if args.from_file and args.to_file: | |
print("Are you joking? This is not cp.") | |
sys.exit(1) | |
PGSQL_USERNAME = os.environ.get('PGUSER', os.environ['USER']) | |
PGSQL_PASSWORD = getpass.getpass( | |
"Please provide password for PostgreSQL user `{0}': ".format( | |
PGSQL_USERNAME)) | |
os.environ["PGUSER"] = PGSQL_USERNAME | |
os.environ["PGPASSWORD"] = PGSQL_PASSWORD | |
try: | |
dbquery("SELECT current_user") | |
except subprocess.CalledProcessError as exception: | |
if exception.returncode == 2: | |
print("Could not connect to database, is your password OK?") | |
sys.exit(0) | |
dumpfiles = [] | |
if args.from_db: | |
if not dbexists(args.from_db): | |
print("Source database `%s' does not exist." % args.from_db) | |
sys.exit(1) | |
sourcedump = dump_database_to_file(args.from_db) | |
dumpfiles.append(sourcedump) | |
elif args.from_file: | |
if not os.path.exists(args.from_file): | |
print("Source file '%s' does not exist." % args.from_file) | |
sys.exit(1) | |
sourcedump = args.from_file | |
if args.to_db: | |
db_exists = dbexists(args.to_db) | |
role_exists = dbroleexists(args.to_db) | |
destowner = args.to_db if role_exists else PGSQL_USERNAME | |
if not db_exists or not role_exists: | |
if not db_exists: | |
print("Destination database `%s' does not exist." % args.to_db) | |
if not role_exists: | |
print("Role `%s' does not exist, using `%s'." % (args.to_db, destowner)) | |
result = input('Continue? (Y/N) ') | |
if not result.upper().startswith('Y'): | |
print("Abort.") | |
sys.exit(1) | |
destdump = import_database_from_file(sourcedump, args.to_db, destowner) | |
if destdump: | |
dumpfiles.append(destdump) | |
elif args.to_file: | |
shutil.copy(sourcedump, args.to_file) | |
if len(dumpfiles) >= 1: | |
print ("Please remove the temporary database dump files (%s) if you're happy with the results of this script." % ', '.join(dumpfiles)) | |
del os.environ["PGPASSWORD"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment