Skip to content

Instantly share code, notes, and snippets.

@ninapavlich
Last active July 13, 2018 18:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ninapavlich/e18e55f5a199eafd4f15aa3c1956aff4 to your computer and use it in GitHub Desktop.
Save ninapavlich/e18e55f5a199eafd4f15aa3c1956aff4 to your computer and use it in GitHub Desktop.
Backup various database types
import datetime
import logging
import os.path
import os
from django.core.management.base import BaseCommand, CommandError
from django.core.files.storage import get_storage_class
from django.core.files import File
from django.conf import settings
from django.utils import timezone as tz
from django.utils.text import slugify
class Command(BaseCommand):
"""
Usage:
To generate a database backup of the default database, and send it to the default file storage, run the following command:
> python manage.py backup_databases
All available options:
-m Is the max number of days to retain a database backup, defaults to 14 days
-s Is the name of the settings variable that defines which storage backend to use for the backups. Defaults to 'DEFAULT_FILE_STORAGE'
-d Is the name of the database settings to use for the backup, defaults to 'default'.
-f Is an optional filename prefix for the database backup file. If specified, the file format will be: <prefix>_<datetime>.sql.zip If it's not specified, the host and database name will be used:
<host>_<datbasename>_<datetime>.sql.zip
-t Is the temporary directory to store the database backup before it's put in storage. Defaults to /tmp
-p Is the the filename pattern to use to specify the datetime. Defaults to %Y-%m-%d_%H-%M-%S
To generate a database backup lasting up to 90 days and send it to a special storage bucket, run the following command:
> python manage.py backup_databases -s=DATABASE_BACKUP_STORAGE -m=90
"""
# Possible Arguments:
max_days = None
storage_backend_setting = None
database_to_backup = None
filename_prefix = None
storage_directory = None
temporary_directory = None
filename_date_pattern = None
# Determine at runtime:
database_settings = None
backup_filename = None
temporary_filename = None
temporary_zip_filename = None
storage_backend = None
stored_file_url = None
def add_arguments(self, parser):
parser.add_argument('-m', '-max_days', action='store', dest='max_days',
help='Max age of backups in days', type=int, default=14)
parser.add_argument('-s', '-storage_backend_setting', action='store', dest='storage_backend_setting',
help='Setting name for storage', type=str, default='DEFAULT_FILE_STORAGE')
parser.add_argument('-d', '-database_to_backup', action='store', dest='database_to_backup',
help='Database setting name to be backed up', type=str, default='default',)
parser.add_argument('-f', '-filename_prefix', action='store', dest='filename_prefix',
help='Prefix backups filename', type=str)
parser.add_argument('-g', '-storage_directory', action='store', dest='storage_directory',
help='Directory to store files within storage', type=str)
parser.add_argument('-t', '-temporary_directory', action='store', dest='temporary_directory',
help='Directory to temporarily store backup', type=str, default='/tmp',)
parser.add_argument('-p', '-filename_date_pattern', action='store', dest='filename_date_pattern',
help='Directory to temporarily store backup', type=str, default='%Y-%m-%d_%H-%M-%S',)
def handle(self, *args, **options):
self.max_days = int(options['max_days'])
self.storage_backend_setting = str(
options['storage_backend_setting'] or '')
self.database_to_backup = str(options['database_to_backup'] or '')
self.filename_prefix = str(options['filename_prefix'] or '')
self.storage_directory = str(
options['storage_directory'] or '')
self.temporary_directory = str(options['temporary_directory'] or '')
self.filename_date_pattern = str(
options['filename_date_pattern'] or '')
self.database_settings = settings.DATABASES[self.database_to_backup]
self.backup_filename = self.get_file_backup_name(
self.filename_prefix, self.database_settings, self.filename_date_pattern)
self.temporary_filename = os.path.join(
self.temporary_directory, self.backup_filename)
logging.debug(u"Temporary database backup file name is %s" %
(self.temporary_filename))
storage_name = getattr(settings, self.storage_backend_setting, None)
self.storage_backend = get_storage_class(storage_name)()
# Generate SQL and save to local temp file
response = self.generate_backup(
self.temporary_filename, self.database_settings)
# Zip SQL
self.temporary_zip_filename = self.zip_backup(self.temporary_filename)
logging.debug(u"Database backup zipped up to %s" %
(self.temporary_zip_filename))
# Store zip file
self.stored_file_url = self.store_backup(
self.storage_backend, self.storage_directory, self.temporary_zip_filename)
logging.info(u"Database backup stored at %s" %
(self.stored_file_url))
# Clean up temporary files
self.cleanup(self.temporary_filename, self.temporary_zip_filename)
# Delete old files
self.rotate_backups(self.storage_backend, self.storage_directory,
self.max_days)
def get_file_backup_name(self, filename_prefix, database_settings, filename_date_pattern):
now = datetime.datetime.now()
now_string = now.strftime(filename_date_pattern)
if filename_prefix:
filename = slugify(u"%s_%s" % (filename_prefix, now_string))
else:
filename = slugify(u"%s_%s_%s" % (database_settings[
'HOST'], database_settings['NAME'], now_string))
extension = '.sql'
if database_settings['ENGINE'] == 'django.db.backends.postgresql':
extension = '.pgsql'
filename_full = u"%s%s" % (filename, extension)
return filename_full
def generate_backup(self, temporary_filename, database_settings):
database_engine = database_settings['ENGINE']
if database_engine == 'django.db.backends.mysql':
return self.generate_backup_mysql(temporary_filename, database_settings)
elif database_engine == 'django.db.backends.postgresql' or database_engine == 'django.db.backends.postgresql_psycopg2':
return self.generate_backup_postgres(temporary_filename, database_settings)
elif database_engine == 'django.db.backends.sqlite3':
return self.generate_backup_sqlite(temporary_filename, database_settings)
raise NotImplementedError(
u'No database backup mechanmism found for database engine of type %s' % (database_engine))
def generate_backup_mysql(self, temporary_filename, database_settings):
MYSQL_CMD = 'mysqldump'
user = database_settings['USER']
password = database_settings['PASSWORD']
name = database_settings['NAME']
port = database_settings['PORT']
host = database_settings['HOST']
# mysqldump -P 3310 -h 127.0.0.1 -u mysql_user -p database_name
# table_name
cmd = "%(mysqldump)s -P %(port)s -h %(host)s -u %(user)s --password='%(password)s' %(database)s > %(temporary_filename)s" % {
'mysqldump': MYSQL_CMD,
'port': port,
'host': host,
'user': user,
'password': password,
'database': name,
'temporary_filename': temporary_filename}
logging.debug("Backing up with command %s " % cmd)
return os.system(cmd)
def generate_backup_postgres(self, temporary_filename, database_settings):
PG_CMD = 'pg_dump'
user = database_settings['USER']
password = database_settings['PASSWORD']
name = database_settings['NAME']
port = database_settings['PORT']
host = database_settings['HOST']
# export PGPASSWORD=db_password; pg_dump -h 127.0.0.1 -U db_username
# db_name > path/to/file.sql
cmd = 'export PGPASSWORD="%(password)s"; %(pgdump)s -h %(host)s -U %(user)s %(database)s > %(temporary_filename)s' % {
'password': password,
'pgdump': PG_CMD,
'host': host,
'user': user,
'database': name,
'temporary_filename': temporary_filename}
logging.debug("Backing up with command %s " % cmd)
return os.system(cmd)
def generate_backup_sqlite(self, temporary_filename, database_settings):
user = database_settings['USER']
password = database_settings['PASSWORD']
name = database_settings['NAME']
port = database_settings['PORT']
host = database_settings['HOST']
import sqlite3
con = sqlite3.connect(name)
with open(temporary_filename, 'w') as f:
for line in con.iterdump():
f.write('%s\n' % line)
def zip_backup(self, temporary_filename):
ZIP_CMD = 'zip'
zipfile_name = u"%s.zip" % (temporary_filename)
if os.path.exists(zipfile_name):
os.remove(zipfile_name)
zip_cmds = {'zip': ZIP_CMD, 'zipfile': zipfile_name,
'file': temporary_filename}
# Create the archive
os.system("%(zip)s -q -9 %(zipfile)s %(file)s" % zip_cmds)
# Test the archive
if not os.system("%(zip)s -T -D -q %(zipfile)s" % zip_cmds):
return zipfile_name
else:
return None
def store_backup(self, storage_backend, storage_directory, temporary_zip_filename):
fn = os.path.join(storage_directory, os.path.split(
temporary_zip_filename)[1])
f = open(temporary_zip_filename, 'rb')
django_file = File(f, fn)
path = storage_backend.save(fn, django_file)
url = storage_backend.url(path)
return url
def cleanup(self, temporary_filename, temporary_zip_filename):
# Remove ZIP file
if os.path.exists(temporary_filename):
os.remove(temporary_filename)
# Remove sql file
if os.path.exists(temporary_zip_filename):
os.remove(temporary_zip_filename)
def rotate_backups(self, storage_backend, storage_directory, max_days):
dir_to_list = os.path.join(storage_directory, '.')
files_in_backup_dir = storage_backend.listdir(dir_to_list)[1]
max_age = datetime.datetime.now() - datetime.timedelta(days=max_days)
dt = tz.make_aware(max_age, tz.utc)
max_age_tz_aware = dt if settings.USE_TZ else tz.make_naive(dt)
logging.debug(u"If we store backups from the last %s days, then the maximum age for a backup is %s" % (
max_days, max_age))
for filename in files_in_backup_dir:
date = storage_backend.get_modified_time(
os.path.join(storage_directory, filename))
# date = self.get_date_from_filename(
# filename, filename_prefix, database_to_backup,
# filename_date_pattern)
if date < max_age_tz_aware:
logging.info(u"Deleting expired back up from %s (%s)" %
(date, filename))
storage_backend.delete(filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment