Skip to content

Instantly share code, notes, and snippets.

@lobeck
Created February 22, 2021 08:50
Show Gist options
  • Save lobeck/650e2a8392ba84e66af9f76b7c644a5b to your computer and use it in GitHub Desktop.
Save lobeck/650e2a8392ba84e66af9f76b7c644a5b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Create LVM snapshot and mount or remove
"""
import argparse
import os
import subprocess
import string
import re
myenv = os.environ
myenv['PATH'] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/bin/X11"
# ======================================================================================================================
# start argument definition
# ======================================================================================================================
parser = argparse.ArgumentParser(description='Create and mount LVM Snapshots')
parser.add_argument('--create', '-c', dest='create_snapshot', help='create and mount snapshot', action='store_true')
parser.add_argument('--unmount', '-u', dest='destroy_snapshot', help='unmount and destroy snapshot', action='store_true')
parser.add_argument('path')
result = parser.parse_args()
# ======================================================================================================================
# end argument definition
# ======================================================================================================================
configuredValues = {}
for key, value in list(vars(result).items()):
configuredValues[key] = value
def execute_system_command(command, printOutput=True, env=myenv,
stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
"""
Helper to execute system commands with output
:param command: the command string as tuple
:param stdout: stdout parameter passed to subprocess.Popen
:param stderr: stderr parameter passed to subprocess.Popen
:return: command stdout and stderr as list
:raise: exeception if returncode != 0
"""
process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr)
output = process.communicate(input=stdin)
# print command output - is passed to networker
if printOutput:
print_command_output(output)
# check if snapshot was created successfully
if process.returncode is not 0:
print("command returned non zero - stopping - %s" % command)
print("failed.") # tell networker about fail
exit(1)
return output
def print_command_output(output):
print("\n".join([line.decode('utf-8').strip() for line in output if line.decode('utf-8').strip() not in ["", "\n"]]))
def get_logical_volume_map():
"""
Helper function to get available logical volumes and their size using lvs
:return: a dictionary with in the format dict[major][minor] = (vg, lv, size)
"""
process = subprocess.Popen(["lvs", "--noheadings", "--units", "g", "-o", "lv_name,vg_name,lv_kernel_major,lv_kernel_minor,lv_size"],
stdout=subprocess.PIPE)
logicalVolumeMap = {}
for line in process.stdout:
splitted = line.rstrip().split()
logicalVolume = splitted[0].decode("utf-8")
volumeGroup = splitted[1].decode("utf-8")
major = int(splitted[2])
minor = int(splitted[3])
size = float(re.sub('[a-zA-Z]', '', splitted[4].decode("utf-8")))
lvmInfo = (volumeGroup, logicalVolume, size)
if major not in logicalVolumeMap:
logicalVolumeMap[major] = {}
logicalVolumeMap[major][minor] = lvmInfo
return logicalVolumeMap
def which(command):
for path in os.environ["PATH"].split(":"):
if os.path.exists(path + "/" + command):
return path + "/" + command
return None
def get_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path
def create_snapshot(volume_group, logical_volume, logical_volume_size):
# prepare snapshot command
snapshotCommand = ["lvcreate",
"-s",
"-L", "%sg" % (logical_volume_size / 10),
"-n", "%s_snap" % logical_volume,
"%s/%s" % (volume_group, logical_volume)]
# create snapshot
print("Using Snapshot command: %s " % (" ".join(snapshotCommand)))
execute_system_command(snapshotCommand)
def mount_snapshot(mount_point):
# check if "container" path exists - otherwise create it
if not os.path.isdir(snapshot_directory_path):
os.makedirs(snapshot_directory_path)
# check if mount point exists - otherwise create it
if not os.path.isdir(mount_point):
os.makedirs(mount_point)
# check if mount point is not mounted
if os.path.ismount(snapshotMountPoint):
print("mount point %s is still active - umount" % mount_point)
umountCommand = ["umount", mount_point]
execute_system_command(mount_point)
snapshotMountCommand = ["mount", snapshotPath, mount_point]
print(snapshotMountCommand)
execute_system_command(snapshotMountCommand)
path = configuredValues["path"]
isLogicalVolume = 0
# get mount point major and minor device number
device = os.stat(path).st_dev
major = os.major(device)
minor = os.minor(device)
logicalVolumeMap = get_logical_volume_map()
if major in logicalVolumeMap and minor in logicalVolumeMap[major]:
isLogicalVolume = 1
else:
print("Found no logical volume for %s - continuing without snapshot" % path)
if isLogicalVolume:
# get logical volume
(volumeGroup, logicalVolume, logicalVolumeSize) = logicalVolumeMap[major][minor]
snapshot_directory_path = '/home/backup/snapshots'
# path to the *to be created* snapshot
snapshotPath = "/dev/%s/%s_snap" % (volumeGroup, logicalVolume)
mount_point = get_mount_point(path)
# mount point for created snapshot
snapshotMountPoint = "%s%s" % (snapshot_directory_path, mount_point)
if configuredValues['create_snapshot'] is True:
create_snapshot(volumeGroup, logicalVolume, logicalVolumeSize)
mount_snapshot(snapshotMountPoint)
if configuredValues['destroy_snapshot'] is True:
# umount snapshot
umountCommand = ["umount", snapshotMountPoint]
print("umount snapshot: %s" % umountCommand)
execute_system_command(umountCommand)
snapshotRemoveCommand = ["lvremove", "-f", snapshotPath]
print("remove snapshot: %s" % snapshotRemoveCommand)
execute_system_command(snapshotRemoveCommand)
# try to find mysqld
#mysqldBinary = which("mysqld")
#mysqlBackupDatadir = "%s/mysql" % snapshotMountPoint
# check if mysqld binary exists and snapshot contains a mysql/mysql subfolder -> containing the mysql system tables
#if mysqldBinary and os.path.isdir("%s/mysql" % mysqlBackupDatadir):
# print "found mysqld - starting recovery"
# mysqlBackupSocket = "/var/run/mysqld/mysqld-backup.sock"
# mysqlBackupPid = "/var/run/mysqld/mysqld-backup.pid"
# mysqlBackupLogfileSize = os.path.getsize("%s/ib_logfile0" % mysqlBackupDatadir)
# mysqldCommand = [mysqldBinary, "--socket", mysqlBackupSocket, "--pid-file", mysqlBackupPid,
# "--skip-networking", "--skip-grant-tables", "--bootstrap", "--skip-slave-start",
# "--innodb-buffer-pool-size", "128M", "--innodb-log-file-size", str(mysqlBackupLogfileSize),
# "--datadir", mysqlBackupDatadir]
# print mysqldCommand
# mysqlBackupStdin = "select 1;"
# execute_system_command(mysqldCommand, stdin=mysqlBackupStdin)
#echo 'select 1;' |
#
# --skip-slave-start =134217728
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment