Skip to content

Instantly share code, notes, and snippets.

@ryanpadilha
Forked from valferon/postgres_manager.py
Created May 7, 2020 04:17
Show Gist options
  • Save ryanpadilha/b55120f7b16bb587d22541d230c10602 to your computer and use it in GitHub Desktop.
Save ryanpadilha/b55120f7b16bb587d22541d230c10602 to your computer and use it in GitHub Desktop.
Python script to take care of postgres backup and restore of data
#!/usr/bin/python3
import argparse
import logging
import subprocess
import os
import tempfile
from tempfile import mkstemp
import configparser
import gzip
import boto3
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# Amazon S3 settings.
# AWS_ACCESS_KEY_ID in ~/.aws/credentials
# AWS_SECRET_ACCESS_KEY in ~/.aws/credentials
import datetime
from shutil import move
AWS_BUCKET_NAME = 'backup.mydomain.com'
AWS_BUCKET_PATH = 'postgres/'
BACKUP_PATH = '/tmp/'
def upload_to_s3(file_full_path, dest_file):
"""
Upload a file to an AWS S3 bucket.
"""
s3_client = boto3.client('s3')
try:
s3_client.upload_file(file_full_path, AWS_BUCKET_NAME, AWS_BUCKET_PATH + dest_file)
os.remove(file_full_path)
except boto3.exceptions.S3UploadFailedError as exc:
print(exc)
exit(1)
def download_from_s3(backup_s3_key, dest_file):
"""
Upload a file to an AWS S3 bucket.
"""
s3_client = boto3.resource('s3')
try:
s3_client.meta.client.download_file(AWS_BUCKET_NAME, backup_s3_key, dest_file)
except Exception as e:
print(e)
exit(1)
def list_available_backup():
key_list = []
s3_client = boto3.client('s3')
s3_objects = s3_client.list_objects_v2(Bucket=AWS_BUCKET_NAME, Prefix=AWS_BUCKET_PATH)
for key in s3_objects['Contents']:
key_list.append(key['Key'])
return key_list
def list_postgres_databases(host, database_name, port, user, password):
try:
process = subprocess.Popen(
['psql',
'--dbname=postgresql://{}:{}@{}:{}/{}'.format(user, password, host, port, database_name),
'--list'],
stdout=subprocess.PIPE
)
output = process.communicate()[0]
if int(process.returncode) != 0:
print('Command failed. Return code : {}'.format(process.returncode))
exit(1)
return output
except Exception as e:
print(e)
exit(1)
def backup_postgres_db(host, database_name, port, user, password, dest_file, verbose):
"""
Backup postgres db to a file.
"""
if verbose:
try:
process = subprocess.Popen(
['pg_dump',
'--dbname=postgresql://{}:{}@{}:{}/{}'.format(user, password, host, port, database_name),
'-Fc',
'-f', dest_file,
'-v'],
stdout=subprocess.PIPE
)
output = process.communicate()[0]
if int(process.returncode) != 0:
print('Command failed. Return code : {}'.format(process.returncode))
exit(1)
return output
except Exception as e:
print(e)
exit(1)
else:
try:
process = subprocess.Popen(
['pg_dump',
'--dbname=postgresql://{}:{}@{}:{}/{}'.format(user, password, host, port, database_name),
'-f', dest_file],
stdout=subprocess.PIPE
)
output = process.communicate()[0]
if process.returncode != 0:
print('Command failed. Return code : {}'.format(process.returncode))
exit(1)
return output
except Exception as e:
print(e)
exit(1)
def compress_file(src_file):
compressed_file = "{}.gz".format(str(src_file))
with open(src_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
for line in f_in:
f_out.write(line)
return compressed_file
def extract_file(src_file):
extracted_file, extension = os.path.splitext(src_file)
print(extracted_file)
with gzip.open(src_file, 'rb') as f_in:
with open(extracted_file, 'wb') as f_out:
for line in f_in:
f_out.write(line)
return extracted_file
def remove_faulty_statement_from_dump(src_file):
temp_file, _ = tempfile.mkstemp()
try:
with open(temp_file, 'w+') as dump_temp:
process = subprocess.Popen(
['pg_restore',
'-l'
'-v',
src_file],
stdout=subprocess.PIPE
)
output = subprocess.check_output(('grep','-v','"EXTENSION - plpgsql"'), stdin=process.stdout)
process.wait()
if int(process.returncode) != 0:
print('Command failed. Return code : {}'.format(process.returncode))
exit(1)
os.remove(src_file)
with open(src_file, 'w+') as cleaned_dump:
subprocess.call(
['pg_restore',
'-L'],
stdin=output,
stdout=cleaned_dump
)
except Exception as e:
print("Issue when modifying dump : {}".format(e))
def change_user_from_dump(source_dump_path, old_user, new_user):
fh, abs_path = mkstemp()
with os.fdopen(fh, 'w') as new_file:
with open(source_dump_path) as old_file:
for line in old_file:
new_file.write(line.replace(old_user, new_user))
# Remove original file
os.remove(source_dump_path)
# Move new file
move(abs_path, source_dump_path)
def restore_postgres_db(db_host, db, port, user, password, backup_file, verbose):
"""
Restore postgres db from a file.
"""
if verbose:
try:
print(user,password,db_host,port, db)
process = subprocess.Popen(
['pg_restore',
'--no-owner',
'--dbname=postgresql://{}:{}@{}:{}/{}'.format(user,
password,
db_host,
port, db),
'-v',
backup_file],
stdout=subprocess.PIPE
)
output = process.communicate()[0]
if int(process.returncode) != 0:
print('Command failed. Return code : {}'.format(process.returncode))
return output
except Exception as e:
print("Issue with the db restore : {}".format(e))
else:
try:
process = subprocess.Popen(
['pg_restore',
'--no-owner',
'--dbname=postgresql://{}:{}@{}:{}/{}'.format(user,
password,
db_host,
port, db),
backup_file],
stdout=subprocess.PIPE
)
output = process.communicate()[0]
if int(process.returncode) != 0:
print('Command failed. Return code : {}'.format(process.returncode))
return output
except Exception as e:
print("Issue with the db restore : {}".format(e))
def create_db(db_host, database, db_port, user_name, user_password):
try:
con = psycopg2.connect(dbname='postgres', port=db_port,
user=user_name, host=db_host,
password=user_password)
except Exception as e:
print(e)
exit(1)
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
try:
cur.execute("DROP DATABASE {} ;".format(database))
except Exception as e:
print('DB does not exist, nothing to drop')
cur.execute("CREATE DATABASE {} ;".format(database))
cur.execute("GRANT ALL PRIVILEGES ON DATABASE {} TO {} ;".format(database, user_name))
return database
def swap_restore_active(db_host, restore_database, active_database, db_port, user_name, user_password):
try:
con = psycopg2.connect(dbname='postgres', port=db_port,
user=user_name, host=db_host,
password=user_password)
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute("SELECT pg_terminate_backend( pid ) "
"FROM pg_stat_activity "
"WHERE pid <> pg_backend_pid( ) "
"AND datname = '{}'".format(active_database))
cur.execute("DROP DATABASE {}".format(active_database))
cur.execute('ALTER DATABASE "{}" RENAME TO "{}";'.format(restore_database, active_database))
except Exception as e:
print(e)
exit(1)
def swap_restore_new(db_host, restore_database, new_database, db_port, user_name, user_password):
try:
con = psycopg2.connect(dbname='postgres', port=db_port,
user=user_name, host=db_host,
password=user_password)
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('ALTER DATABASE "{}" RENAME TO "{}";'.format(restore_database, new_database))
except Exception as e:
print(e)
exit(1)
def main():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
args_parser = argparse.ArgumentParser(description='Postgres database management')
args_parser.add_argument("--action",
metavar="action",
choices=['list', 'list_dbs', 'restore', 'backup'],
required=True)
args_parser.add_argument("--date",
metavar="YYYY-MM-dd",
help="Date to use for restore (show with --action list)")
args_parser.add_argument("--dest-db",
metavar="dest_db",
default=None,
help="Name of the new restored database")
args_parser.add_argument("--verbose",
default=True,
help="verbose output")
args_parser.add_argument("--configfile",
required=True,
help="Database configuration file")
args = args_parser.parse_args()
config = configparser.ConfigParser()
config.read(args.configfile)
postgres_host = config.get('postgresql', 'host')
postgres_port = config.get('postgresql', 'port')
postgres_db = config.get('postgresql', 'db')
postgres_restore = "{}_restore".format(postgres_db)
postgres_user = config.get('postgresql', 'user')
postgres_password = config.get('postgresql', 'password')
timestr = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
filename = 'backup-{}-{}.dump'.format(timestr, postgres_db)
filename_compressed = '{}.gz'.format(filename)
restore_filename = '/tmp/restore.dump.gz'
restore_uncompressed = '/tmp/restore.dump'
local_file_path = '{}{}'.format(BACKUP_PATH, filename)
# list task
if args.action == "list":
logger.info('Listing S3 bucket s3://{}/{} content :'.format(AWS_BUCKET_NAME,
AWS_BUCKET_PATH))
s3_backup_objects = list_available_backup()
for key in s3_backup_objects:
logger.info("Key : {}".format(key))
# list databases task
elif args.action == "list_dbs":
result = list_postgres_databases(postgres_host,
postgres_db,
postgres_port,
postgres_user,
postgres_password)
for line in result.splitlines():
logger.info(line)
# backup task
elif args.action == "backup":
logger.info('Backing up {} database to {}'.format(postgres_db, local_file_path))
result = backup_postgres_db(postgres_host,
postgres_db,
postgres_port,
postgres_user,
postgres_password,
local_file_path, args.verbose)
for line in result.splitlines():
logger.info(line)
logger.info("Backup complete")
logger.info("Compressing {}".format(local_file_path))
comp_file = compress_file(local_file_path)
logger.info('Uploading {} to Amazon S3...'.format(comp_file))
upload_to_s3(comp_file, filename_compressed)
logger.info("Uploaded to {}".format(filename_compressed))
# restore task
elif args.action == "restore":
if not args.date:
logger.warn('No date was chosen for restore. Run again with the "list" '
'action to see available restore dates')
else:
try:
os.remove(restore_filename)
except Exception as e:
logger.info(e)
all_backup_keys = list_available_backup()
backup_match = [s for s in all_backup_keys if args.date in s]
if backup_match:
logger.info("Found the following backup : {}".format(backup_match))
else:
logger.error("No match found for backups with date : {}".format(args.date))
logger.info("Available keys : {}".format([s for s in all_backup_keys]))
exit(1)
logger.info("Downloading {} from S3 into : {}".format(backup_match[0], restore_filename))
download_from_s3(backup_match[0], restore_filename)
logger.info("Download complete")
logger.info("Extracting {}".format(restore_filename))
ext_file = extract_file(restore_filename)
# cleaned_ext_file = remove_faulty_statement_from_dump(ext_file)
logger.info("Extracted to : {}".format(ext_file))
logger.info("Creating temp database for restore : {}".format(postgres_restore))
tmp_database = create_db(postgres_host,
postgres_restore,
postgres_port,
postgres_user,
postgres_password)
logger.info("Created temp database for restore : {}".format(tmp_database))
logger.info("Restore starting")
result = restore_postgres_db(postgres_host,
postgres_restore,
postgres_port,
postgres_user,
postgres_password,
restore_uncompressed,
args.verbose)
for line in result.splitlines():
logger.info(line)
logger.info("Restore complete")
if args.dest_db is not None:
restored_db_name = args.dest_db
logger.info("Switching restored database with new one : {} > {}".format(
postgres_restore, restored_db_name
))
swap_restore_new(postgres_host,
postgres_restore,
restored_db_name,
postgres_port,
postgres_user,
postgres_password)
else:
restored_db_name = postgres_db
logger.info("Switching restored database with active one : {} > {}".format(
postgres_restore, restored_db_name
))
swap_restore_active(postgres_host,
postgres_restore,
restored_db_name,
postgres_port,
postgres_user,
postgres_password)
logger.info("Database restored and active.")
else:
logger.warn("No valid argument was given.")
logger.warn(args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment