Last active
March 17, 2022 10:36
-
-
Save mcuelenaere/d826808a0e806b36c1bf44a7b741be6b to your computer and use it in GitHub Desktop.
Script to automate creating a MySQL backup using LVM snapshots, over multiple disks on AWS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import boto3 | |
import re | |
import subprocess | |
import os, os.path | |
import tempfile | |
import shutil | |
from datetime import datetime | |
from itertools import groupby | |
from pymysql import connect as mysql_connect, cursors | |
from pymysql.err import OperationalError | |
from socket import gethostname | |
from traceback import print_exc | |
from urllib.request import urlopen | |
def read_file(filename): | |
with open(filename, 'r') as f: | |
return f.read() | |
def read_url(url): | |
return urlopen(url).read() | |
def extract_mysql_details(): | |
username, password = (None, None) | |
matcher = re.compile(r'^([\S]+)\s*=\s*(\S+)$') | |
paths = list(filter(lambda x: os.path.exists(x), [ os.path.expanduser('~/.my.cnf'), '/etc/mysql/debian.cnf' ])) | |
for path in paths: | |
for line in read_file(path).splitlines(): | |
m = matcher.match(line) | |
if m is None: | |
continue | |
if m.group(1) == 'user': | |
username = m.group(2) | |
elif m.group(1) == 'password': | |
password = m.group(2) | |
if len(username) and len(password): | |
return (username, password) | |
raise RuntimeError("Failed to extract mysql details in paths: %s" % "\n".join(paths)) | |
def fetch_my_instance_id(): | |
return read_url('http://169.254.169.254/latest/meta-data/instance-id') | |
class MySQLLockTablesTimeoutError(RuntimeError): | |
pass | |
class MySQL(): | |
def connect(self, read_timeout=None): | |
(username, password) = extract_mysql_details() | |
self.dbconn = mysql_connect( | |
unix_socket='/var/run/mysqld/mysqld.sock', | |
user=username, | |
passwd=password, | |
read_timeout=read_timeout | |
) | |
def lock_tables(self): | |
cursor = self.dbconn.cursor() | |
try: | |
cursor.execute("FLUSH TABLES WITH READ LOCK") | |
except OperationalError as e: | |
if e.args[0] == 2013: | |
raise MySQLLockTablesTimeoutError() | |
else: | |
raise | |
finally: | |
cursor.close() | |
def get_binlog_positions(self): | |
cursor = self.dbconn.cursor(cursors.DictCursor) | |
try: | |
cursor.execute("SHOW MASTER STATUS") | |
master_status = cursor.fetchone() | |
cursor.execute("SHOW SLAVE STATUS") | |
slave_status = cursor.fetchone() | |
return { | |
'MASTER': master_status, | |
'SLAVE': slave_status | |
} | |
finally: | |
cursor.close() | |
def get_running_processes(self): | |
cursor = self.dbconn.cursor(cursors.DictCursor) | |
try: | |
cursor.execute("SHOW PROCESSLIST") | |
row = cursor.fetchone() | |
while row is not None: | |
if row['Command'] != 'Sleep': | |
yield { | |
'User': row['User'], | |
'Database': row['db'], | |
'Command': row['Command'], | |
'RunningTime': row['Time'], | |
'State': row['State'], | |
'Query': row['Info'] | |
} | |
row = cursor.fetchone() | |
finally: | |
cursor.close() | |
def unlock_tables(self): | |
cursor = self.dbconn.cursor() | |
cursor.execute("UNLOCK TABLES") | |
cursor.close() | |
def discover_volumes(instance_id): | |
device_mapping = dict() | |
ec2 = boto3.resource('ec2') | |
instance = ec2.Instance(id=instance_id) | |
# fetch block device mappings | |
block_device_mappings = instance.describe_attribute(Attribute='blockDeviceMapping')['BlockDeviceMappings'] | |
for mapping in block_device_mappings: | |
if 'Ebs' not in mapping: | |
continue | |
disk = mapping['DeviceName'].replace('/dev/sd', '/dev/xvd') | |
device_mapping[disk] = ec2.Volume(id=mapping['Ebs']['VolumeId']) | |
# also add support for newer NVMe-style disks | |
for _, _, filenames in os.walk('/dev/disk/by-id'): | |
for filename in filenames: | |
m = re.match('nvme-Amazon_Elastic_Block_Store_vol([\w\d]+)(?:-part\d)?', filename) | |
if m is None: | |
continue | |
disk = os.path.realpath("/dev/disk/by-id/%s" % filename) | |
device_mapping[disk] = ec2.Volume(id="vol-%s" % m.group(1)) | |
return device_mapping | |
def discover_mounts(): | |
def discover(): | |
for line in read_file('/proc/mounts').splitlines(): | |
device, mount_point, fs_type, options, _, _ = line.rstrip().split(' ') | |
yield (mount_point, device) | |
return dict(discover()) | |
def discover_lvm_volume_groups(): | |
def discover(): | |
for line in subprocess.check_output(['/sbin/pvs', '--separator=|', '--noheading', '-o', 'pv_name,vg_name']).splitlines(): | |
yield line.decode('utf8').strip().split('|') | |
return dict( | |
(volume_group, tuple(x[0] for x in items)) | |
for volume_group, items in groupby(discover(), lambda x: x[1]) | |
) | |
def parse_lvm_mapper_name(name): | |
if not name.startswith('/dev/mapper/'): | |
return None | |
name = name.replace('/dev/mapper/', '') | |
lv, vg = map(lambda x: x[::-1].replace('--', '-'), name[::-1].split('-', 1)) | |
return vg, lv | |
def discover_mysql_volumes(): | |
instance_id = fetch_my_instance_id() | |
volumes = discover_volumes(instance_id) | |
mounts = discover_mounts() | |
volume_groups = discover_lvm_volume_groups() | |
# find devices mounted on /var/lib/mysql* and extract LVM volume groups | |
for mount, device in mounts.items(): | |
if not mount.startswith('/var/lib/mysql'): | |
continue | |
lvm_metadata = parse_lvm_mapper_name(device) | |
if lvm_metadata is None: | |
continue | |
# map the devices to EBS volumes | |
vg, lv = lvm_metadata | |
ebs_volumes = tuple(volumes[device] for device in volume_groups[vg]) | |
yield (mount, vg, lv, ebs_volumes) | |
def create_lvm_snapshot(vg, lv, size='10G', name=None): | |
if name is None: | |
name = "%s_snapshot_%i" % (lv, os.getpid()) | |
subprocess.check_call(['/sbin/lvcreate', '--snapshot', '%s/%s' % (vg, lv), '-L%s' % size, '--name', name]) | |
return name | |
def remove_lvm_snapshot(vg, lv): | |
subprocess.check_call(['/sbin/lvremove', '-f', '%s/%s' % (vg, lv)]) | |
def write_position_file(positions): | |
file = tempfile.NamedTemporaryFile(mode='wt', delete=True) | |
for type, info in positions.items(): | |
if info is None: | |
continue | |
for key, value in info.items(): | |
file.write("%s:%s=%s\n" % (type, key, value)) | |
file.flush() | |
return file | |
def copy_position_file(file, target): | |
shutil.copy2(file.name, target) | |
def main(): | |
dry_run = False | |
volumes = tuple(discover_mysql_volumes()) | |
hostname = gethostname() | |
today = datetime.now().strftime("%Y-%m-%d") | |
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
dbconn = MySQL() | |
# Describe what we are about to do | |
print('Going to create LVM & EBS snapshots for the following volumes:') | |
for mount_point, volume_group, logical_volume, ebses in volumes: | |
for ebs in ebses: | |
print('%s -> %s/%s on %s' % (mount_point, volume_group, logical_volume, ebs)) | |
# Go! | |
start_time = datetime.now() | |
# Connect with DB and lock tables | |
print("Connecting to DB...") | |
dbconn.connect(read_timeout=30) | |
print("Locking tables...") | |
try: | |
dbconn.lock_tables() | |
except MySQLLockTablesTimeoutError: | |
dbconn = MySQL() | |
dbconn.connect() | |
running_processes = tuple(dbconn.get_running_processes()) | |
raise RuntimeError("Timed out when trying to lock tables, currently active processes: %s" % "\n".join(map(str, running_processes))) | |
# Create tmp position file | |
print("Creating position file...") | |
pos_file = write_position_file(dbconn.get_binlog_positions()) | |
# Create LVM snapshots and unlock tables | |
snapshots = [] | |
try: | |
try: | |
# Create LVM snapshots | |
for mount_point, volume_group, logical_volume, ebses in volumes: | |
# Copy position file | |
copy_position_file(pos_file, mount_point + '/bin.pos') | |
print("Creating LVM snapshot for %s/%s..." % (volume_group, logical_volume)) | |
snapshot_logical_volume = create_lvm_snapshot(volume_group, logical_volume) | |
snapshots.append((volume_group, snapshot_logical_volume, ebses)) | |
# Remove position file again after snapshot | |
os.remove(mount_point + '/bin.pos') | |
finally: | |
print("Unlocking tables...") | |
dbconn.unlock_tables() | |
print("Tables had been locked for %s!" % str(datetime.now()-start_time)) | |
# Create EBS snapshots | |
already_snapshotted = set() | |
for volume_group, logical_volume, ebses in snapshots: | |
for ebs in ebses: | |
if ebs.id in already_snapshotted: | |
continue | |
print("Creating EBS snapshot for %s (part of %s/%s)" % (ebs.id, volume_group, logical_volume)) | |
ebs.create_snapshot( | |
DryRun=dry_run, | |
Description='Automatic backup of %s %s on %s' % (hostname, ebs.id, now), | |
TagSpecifications=[ | |
{ | |
'ResourceType': 'snapshot', | |
'Tags': [ | |
{ | |
'Key': 'Name', | |
'Value': '%s_backup_%s_%s' % (hostname, today, ebs.id) | |
}, | |
{ | |
'Key': 'Hostname', | |
'Value': hostname | |
}, | |
{ | |
'Key': 'Volume', | |
'Value': ebs.id | |
} | |
] | |
} | |
] | |
) | |
already_snapshotted.add(ebs.id) | |
finally: | |
# Remove LVM snapshots | |
for volume_group, logical_volume, _ in snapshots: | |
try: | |
print("Removing LVM snapshot %s/%s..." % (volume_group, logical_volume)) | |
remove_lvm_snapshot(volume_group, logical_volume) | |
except: | |
# Print message and continue removing LVM snapshots | |
print("Removal of LVM snapshot %s/%s failed" % (volume_group, logical_volume)) | |
print_exc() | |
print("Backup completed. Elapsed time: %s" % str(datetime.now()-start_time)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment