Skip to content

Instantly share code, notes, and snippets.

@jourdanrodrigues
Created May 22, 2017 02:37
Show Gist options
  • Save jourdanrodrigues/979469232a831d1dfc16bababf3676bc to your computer and use it in GitHub Desktop.
Save jourdanrodrigues/979469232a831d1dfc16bababf3676bc to your computer and use it in GitHub Desktop.
Backup commands for Django.
from datetime import datetime
import os
from dj_database_url import parse as db_parse
from django.conf import settings
from django.core.management import BaseCommand
from assets.utils import console_log
BACKUPS_PATH = os.path.join(settings.BASE_DIR, 'backups')
def get_db_data(source):
"""
:type source: str
:rtype: dict
"""
database_url = os.getenv(source) or os.getenv('SOURCE_DATABASE_URL')
if not database_url:
message = console_log(
'You have to pass the "--source" parameter with the environment variable that has the URL '
'of the database or set "SOURCE_DATABASE_URL" in the environment to perform this action.', get_message=True
)
raise Exception(message)
return db_parse(database_url)
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--source', dest='source', type=str)
def handle(self, *args, **options):
db_data = get_db_data(options['source'] or '')
# Setup backup
if not os.path.isdir(BACKUPS_PATH):
os.makedirs(BACKUPS_PATH)
optional_params = [
'--port {}'.format(db_data.get('PORT') or 5432)
]
file_name = '{}_{}_{}.dump'.format(
datetime.now().strftime('%Y_%m_%d-%H_%M_%S'), db_data['USER'], db_data['NAME']
)
file_path = os.path.join(BACKUPS_PATH, file_name)
command = (
'{postgres_path}pg_dump --no-owner --host {host} --username "{username}" --no-password --format custom '
'--blobs --verbose {optional_params} --file "{file_path}" "{database}"'.format(
postgres_path=getattr(settings, 'POSTGRES_PATH', ''), file_path=file_path,
host=db_data['HOST'], username=db_data['USER'], database=db_data['NAME'],
optional_params=' '.join(optional_params)
)
)
console_log('Backing up database "{}" from "{}"...'.format(db_data['NAME'], db_data['HOST']))
if os.system(command) == 0:
console_log('Backup saved at :') # The returned value will be printed anyway
else:
raise Exception('Something went wrong with the backup.')
return file_path
import os
from dj_database_url import parse as db_parse
from django.conf import settings
from django.core.management import BaseCommand, call_command
from .perform_backup import BACKUPS_PATH, get_db_data as get_source_db_data
from assets.utils import console_log, check_command_code
def get_db_data(target):
"""
:param target:
:return:
"""
database_url = os.getenv(target) or os.getenv('TARGET_DATABASE_URL')
if not database_url:
message = console_log(
'You have to pass the "--target" parameter with the environment variable that has the URL '
'of the database or set "TARGET_DATABASE_URL" in the environment to perform this action.',
get_message=True
)
raise Exception(message)
return db_parse(database_url)
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--source', dest='source', type=str)
parser.add_argument('--target', dest='target', type=str)
parser.add_argument('--backup-file', dest='backup_file', type=str)
def handle(self, *args, **options):
source_db_data = get_source_db_data(options['source'] or '')
target_db_data = get_db_data(options['target'] or '')
postgres_path = getattr(settings, 'POSTGRES_PATH', '')
backup_file = options.get('backup_file')
if not backup_file:
backup_file_end = '{}_{}.dump'.format(source_db_data['USER'], source_db_data['NAME'])
files = [item for item in os.listdir(BACKUPS_PATH) if item.endswith(backup_file_end)]
if files:
files.sort(reverse=True)
backup_file = os.path.join(BACKUPS_PATH, files[0])
else:
backup_file = call_command('perform_backup', **options)
command_options = [
'--host {}'.format(target_db_data['HOST']),
'--username "{}"'.format(target_db_data['USER']),
'--no-password',
'--port {}'.format(target_db_data.get('PORT') or 5432)
]
console_log('Selected backup "{}".'.format(backup_file))
console_log('Start cleaning of database "{}" from "{}".'.format(target_db_data['NAME'], target_db_data['HOST']))
# Drop active connections to database
command = (
'{postgres_path}psql --dbname {database} {command_options} -c '
'"SELECT pg_terminate_backend(PID) FROM pg_stat_activity WHERE datname = \'{database}\'"'.format(
postgres_path=postgres_path,
database=target_db_data['NAME'],
command_options=' '.join(command_options)
)
)
console_log('Drop active connections...')
os.system(command) # If it does not work, just move on
# Drop the database
command = '{postgres_path}dropdb {command_options} "{database}"'.format(
postgres_path=postgres_path, database=target_db_data['NAME'],
command_options=' '.join(command_options)
)
console_log('Drop database...')
check_command_code(os.system(command), 'Database dropped.', 'Something went wrong.')
# Create the database
command = '{postgres_path}createdb {command_options} "{database}"'.format(
postgres_path=postgres_path, database=target_db_data['NAME'],
command_options=' '.join(command_options)
)
console_log('Create database...')
check_command_code(os.system(command), 'Database created.', 'Something went wrong.')
# Restore the database
command = (
'{postgres_path}pg_restore --host {host} --no-owner --role={username} --port {port} '
'--username "{username}" --no-password --dbname "{database}" --verbose {file}'.format(
postgres_path=postgres_path, host=target_db_data['HOST'], username=target_db_data['USER'],
port=target_db_data.get('PORT') or 5432, database=target_db_data['NAME'], file=backup_file
)
)
args = [os.path.basename(backup_file), target_db_data['NAME'], target_db_data['HOST']]
console_log('Restore "{}" to database "{}" from "{}".'.format(*args))
check_command_code(
os.system(command),
console_log('Backup "{}" restored to database "{}" from "{}"'.format(*args), get_message=True),
'Something went wrong.'
)
from django.core.management import BaseCommand, call_command
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--source', dest='source', type=str)
parser.add_argument('--target', dest='target', type=str)
def handle(self, *args, **options):
backup_file = call_command('perform_backup', **options)
options['backup_file'] = backup_file
call_command('restore_backup', **options)
import textwrap
def check_command_code(code, success_message, failure_message):
if code == 0:
console_log(success_message)
else:
raise Exception(failure_message)
def console_log(message, get_message=False, line_width=0):
"""
:type message: str
:type get_message: bool
:type line_width: int
"""
if line_width:
message = textwrap.fill(text=message, width=line_width)
message = '\033[94m{}\033[0m'.format(message)
if get_message:
return message
print(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment