Skip to content

Instantly share code, notes, and snippets.

@mcuelenaere
Last active March 17, 2022 10:36
Show Gist options
  • Save mcuelenaere/d826808a0e806b36c1bf44a7b741be6b to your computer and use it in GitHub Desktop.
Save mcuelenaere/d826808a0e806b36c1bf44a7b741be6b to your computer and use it in GitHub Desktop.
Script to automate creating a MySQL backup using LVM snapshots, over multiple disks on AWS
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import boto3
import re
import subprocess
import os, os.path
import tempfile
import shutil
from datetime import datetime
from itertools import groupby
from pymysql import connect as mysql_connect, cursors
from pymysql.err import OperationalError
from socket import gethostname
from traceback import print_exc
from urllib.request import urlopen
def read_file(filename):
with open(filename, 'r') as f:
return f.read()
def read_url(url):
return urlopen(url).read()
def extract_mysql_details():
username, password = (None, None)
matcher = re.compile(r'^([\S]+)\s*=\s*(\S+)$')
paths = list(filter(lambda x: os.path.exists(x), [ os.path.expanduser('~/.my.cnf'), '/etc/mysql/debian.cnf' ]))
for path in paths:
for line in read_file(path).splitlines():
m = matcher.match(line)
if m is None:
continue
if m.group(1) == 'user':
username = m.group(2)
elif m.group(1) == 'password':
password = m.group(2)
if len(username) and len(password):
return (username, password)
raise RuntimeError("Failed to extract mysql details in paths: %s" % "\n".join(paths))
def fetch_my_instance_id():
return read_url('http://169.254.169.254/latest/meta-data/instance-id')
class MySQLLockTablesTimeoutError(RuntimeError):
pass
class MySQL():
def connect(self, read_timeout=None):
(username, password) = extract_mysql_details()
self.dbconn = mysql_connect(
unix_socket='/var/run/mysqld/mysqld.sock',
user=username,
passwd=password,
read_timeout=read_timeout
)
def lock_tables(self):
cursor = self.dbconn.cursor()
try:
cursor.execute("FLUSH TABLES WITH READ LOCK")
except OperationalError as e:
if e.args[0] == 2013:
raise MySQLLockTablesTimeoutError()
else:
raise
finally:
cursor.close()
def get_binlog_positions(self):
cursor = self.dbconn.cursor(cursors.DictCursor)
try:
cursor.execute("SHOW MASTER STATUS")
master_status = cursor.fetchone()
cursor.execute("SHOW SLAVE STATUS")
slave_status = cursor.fetchone()
return {
'MASTER': master_status,
'SLAVE': slave_status
}
finally:
cursor.close()
def get_running_processes(self):
cursor = self.dbconn.cursor(cursors.DictCursor)
try:
cursor.execute("SHOW PROCESSLIST")
row = cursor.fetchone()
while row is not None:
if row['Command'] != 'Sleep':
yield {
'User': row['User'],
'Database': row['db'],
'Command': row['Command'],
'RunningTime': row['Time'],
'State': row['State'],
'Query': row['Info']
}
row = cursor.fetchone()
finally:
cursor.close()
def unlock_tables(self):
cursor = self.dbconn.cursor()
cursor.execute("UNLOCK TABLES")
cursor.close()
def discover_volumes(instance_id):
device_mapping = dict()
ec2 = boto3.resource('ec2')
instance = ec2.Instance(id=instance_id)
# fetch block device mappings
block_device_mappings = instance.describe_attribute(Attribute='blockDeviceMapping')['BlockDeviceMappings']
for mapping in block_device_mappings:
if 'Ebs' not in mapping:
continue
disk = mapping['DeviceName'].replace('/dev/sd', '/dev/xvd')
device_mapping[disk] = ec2.Volume(id=mapping['Ebs']['VolumeId'])
# also add support for newer NVMe-style disks
for _, _, filenames in os.walk('/dev/disk/by-id'):
for filename in filenames:
m = re.match('nvme-Amazon_Elastic_Block_Store_vol([\w\d]+)(?:-part\d)?', filename)
if m is None:
continue
disk = os.path.realpath("/dev/disk/by-id/%s" % filename)
device_mapping[disk] = ec2.Volume(id="vol-%s" % m.group(1))
return device_mapping
def discover_mounts():
def discover():
for line in read_file('/proc/mounts').splitlines():
device, mount_point, fs_type, options, _, _ = line.rstrip().split(' ')
yield (mount_point, device)
return dict(discover())
def discover_lvm_volume_groups():
def discover():
for line in subprocess.check_output(['/sbin/pvs', '--separator=|', '--noheading', '-o', 'pv_name,vg_name']).splitlines():
yield line.decode('utf8').strip().split('|')
return dict(
(volume_group, tuple(x[0] for x in items))
for volume_group, items in groupby(discover(), lambda x: x[1])
)
def parse_lvm_mapper_name(name):
if not name.startswith('/dev/mapper/'):
return None
name = name.replace('/dev/mapper/', '')
lv, vg = map(lambda x: x[::-1].replace('--', '-'), name[::-1].split('-', 1))
return vg, lv
def discover_mysql_volumes():
instance_id = fetch_my_instance_id()
volumes = discover_volumes(instance_id)
mounts = discover_mounts()
volume_groups = discover_lvm_volume_groups()
# find devices mounted on /var/lib/mysql* and extract LVM volume groups
for mount, device in mounts.items():
if not mount.startswith('/var/lib/mysql'):
continue
lvm_metadata = parse_lvm_mapper_name(device)
if lvm_metadata is None:
continue
# map the devices to EBS volumes
vg, lv = lvm_metadata
ebs_volumes = tuple(volumes[device] for device in volume_groups[vg])
yield (mount, vg, lv, ebs_volumes)
def create_lvm_snapshot(vg, lv, size='10G', name=None):
if name is None:
name = "%s_snapshot_%i" % (lv, os.getpid())
subprocess.check_call(['/sbin/lvcreate', '--snapshot', '%s/%s' % (vg, lv), '-L%s' % size, '--name', name])
return name
def remove_lvm_snapshot(vg, lv):
subprocess.check_call(['/sbin/lvremove', '-f', '%s/%s' % (vg, lv)])
def write_position_file(positions):
file = tempfile.NamedTemporaryFile(mode='wt', delete=True)
for type, info in positions.items():
if info is None:
continue
for key, value in info.items():
file.write("%s:%s=%s\n" % (type, key, value))
file.flush()
return file
def copy_position_file(file, target):
shutil.copy2(file.name, target)
def main():
dry_run = False
volumes = tuple(discover_mysql_volumes())
hostname = gethostname()
today = datetime.now().strftime("%Y-%m-%d")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
dbconn = MySQL()
# Describe what we are about to do
print('Going to create LVM & EBS snapshots for the following volumes:')
for mount_point, volume_group, logical_volume, ebses in volumes:
for ebs in ebses:
print('%s -> %s/%s on %s' % (mount_point, volume_group, logical_volume, ebs))
# Go!
start_time = datetime.now()
# Connect with DB and lock tables
print("Connecting to DB...")
dbconn.connect(read_timeout=30)
print("Locking tables...")
try:
dbconn.lock_tables()
except MySQLLockTablesTimeoutError:
dbconn = MySQL()
dbconn.connect()
running_processes = tuple(dbconn.get_running_processes())
raise RuntimeError("Timed out when trying to lock tables, currently active processes: %s" % "\n".join(map(str, running_processes)))
# Create tmp position file
print("Creating position file...")
pos_file = write_position_file(dbconn.get_binlog_positions())
# Create LVM snapshots and unlock tables
snapshots = []
try:
try:
# Create LVM snapshots
for mount_point, volume_group, logical_volume, ebses in volumes:
# Copy position file
copy_position_file(pos_file, mount_point + '/bin.pos')
print("Creating LVM snapshot for %s/%s..." % (volume_group, logical_volume))
snapshot_logical_volume = create_lvm_snapshot(volume_group, logical_volume)
snapshots.append((volume_group, snapshot_logical_volume, ebses))
# Remove position file again after snapshot
os.remove(mount_point + '/bin.pos')
finally:
print("Unlocking tables...")
dbconn.unlock_tables()
print("Tables had been locked for %s!" % str(datetime.now()-start_time))
# Create EBS snapshots
already_snapshotted = set()
for volume_group, logical_volume, ebses in snapshots:
for ebs in ebses:
if ebs.id in already_snapshotted:
continue
print("Creating EBS snapshot for %s (part of %s/%s)" % (ebs.id, volume_group, logical_volume))
ebs.create_snapshot(
DryRun=dry_run,
Description='Automatic backup of %s %s on %s' % (hostname, ebs.id, now),
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'Name',
'Value': '%s_backup_%s_%s' % (hostname, today, ebs.id)
},
{
'Key': 'Hostname',
'Value': hostname
},
{
'Key': 'Volume',
'Value': ebs.id
}
]
}
]
)
already_snapshotted.add(ebs.id)
finally:
# Remove LVM snapshots
for volume_group, logical_volume, _ in snapshots:
try:
print("Removing LVM snapshot %s/%s..." % (volume_group, logical_volume))
remove_lvm_snapshot(volume_group, logical_volume)
except:
# Print message and continue removing LVM snapshots
print("Removal of LVM snapshot %s/%s failed" % (volume_group, logical_volume))
print_exc()
print("Backup completed. Elapsed time: %s" % str(datetime.now()-start_time))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment