Skip to content

Instantly share code, notes, and snippets.

@socketpair
Created November 6, 2023 19:14
Show Gist options
  • Save socketpair/5432adadc9c7f50a01e5837a85cc6495 to your computer and use it in GitHub Desktop.
Save socketpair/5432adadc9c7f50a01e5837a85cc6495 to your computer and use it in GitHub Desktop.
XFS snapshot
import argparse
import json
import logging
import os
from contextlib import contextmanager
from errno import EINVAL
from fcntl import LOCK_EX, flock, ioctl
from os import O_CLOEXEC, O_DIRECTORY, O_RDONLY, close, open as os_open
from pathlib import Path
from shutil import rmtree
from signal import SIGKILL
from subprocess import CalledProcessError, Popen, TimeoutExpired, call, check_call, check_output
from tempfile import TemporaryDirectory
from time import monotonic, sleep
log = logging.getLogger(__name__)
FIFREEZE = 3221510263
FITHAW = 3221510264
@contextmanager
def _measure(operation: str):
start = monotonic()
# log.debug('Measuring "%s" operation.', operation)
try:
yield
finally:
log.debug('Operation "%s" completed in %2.2f seconds.', operation, monotonic() - start)
@contextmanager
def _frozen(path: Path):
fd = os_open(path, O_RDONLY | O_DIRECTORY | O_CLOEXEC)
def unfreeze() -> None:
log.debug('Unfreezing')
try:
ioctl(fd, FITHAW)
except OSError as err:
if err.errno != EINVAL:
raise
try:
log.debug('Freezing')
# KERNEL BUG:если здесь происходит сигнал (например, SIGINT), FS остается зависшей(!)
# вот почему разморозка всегда должна пытаться разморозить даже после неудачной заморозки.
ioctl(fd, FIFREEZE)
try:
yield unfreeze
finally:
unfreeze()
finally:
close(fd)
_RSYNC_ARGS = [
'rsync',
'-a',
# '--checksum-choice=xxh128', # потому что '--only-write-batch' сбрасывается на SLOW MD5!
# Алгоритм случайного изменения контрольной суммы не работает.
'--inplace',
'--hard-links',
'--acls',
'--xattrs',
'--one-file-system',
'--delete',
'--numeric-ids',
'--preallocate',
'--trust-sender',
]
def _make_reflink_copy(source: Path, destination: Path) -> None:
log.debug('Removing snapshot-copy as %s', destination)
if destination.exists():
with _measure('unlink destination'):
rmtree(destination)
destination.mkdir(parents=True)
for copy_attempt_count in range(1, 20):
# Исходный каталог активно меняется, возможен не нулевой код возврата
log.debug('Reflinking')
# -u - не делать reflink, если mtime то же самое/
# high_prio - minimize race conditions on high disk load
with _measure('reflink copy'):
if not call(['cp', '-u', '-a', '--reflink=always', '--no-target-directory', '--one-file-system', source, destination]):
break
log.info('Reflink failed. Attempt: %d. Retrying', copy_attempt_count)
else:
log.warning('Reflink copy is not complete. High disk load ?')
def _atomic_freeze(source: Path, destination: Path, *, freeze_timeout: int, show_changes: bool) -> None:
with TemporaryDirectory() as tmpdir:
batch = Path(tmpdir) / 'batch'
with _frozen(source) as unfreeze:
with _measure('Rsync on frozen FS'):
log.debug('Running rsync on frozen FS to create batch')
with Popen( # pylint: disable=subprocess-popen-preexec-fn
_RSYNC_ARGS
+ [
*(['--itemize-changes'] if show_changes else []),
'--only-write-batch', batch,
'--',
f'{source}/',
f'{destination}/',
],
start_new_session=True,
) as proc:
try:
deadline = monotonic() + freeze_timeout
while proc.returncode is None and monotonic() < deadline:
try:
# proc.wait() may be interrupted by SIGINT.
proc.wait(0.1)
except TimeoutExpired:
if not Path(f'/proc/{proc.pid}/fd/3').exists():
continue
# Path().read_text() may raise ENOENT is process die unexpectedly (even successfully)
if 'xfs_free_eofblocks' not in Path(f'/proc/{proc.pid}/stack').read_text():
continue
# [<0>] percpu_rwsem_wait+0x116/0x140
# [<0>] xfs_trans_alloc+0x20c/0x220 [xfs]
# [<0>] xfs_free_eofblocks+0x83/0x120 [xfs]
# [<0>] xfs_release+0x143/0x180 [xfs]
# [<0>] __fput+0x8e/0x250
# [<0>] task_work_run+0x5a/0x90
# [<0>] exit_to_user_mode_prepare+0x1e6/0x1f0
# [<0>] syscall_exit_to_user_mode+0x1b/0x40
# [<0>] do_syscall_64+0x6b/0x90
# [<0>] entry_SYSCALL_64_after_hwframe+0x72/0xdc
log.debug('XFS hang detected')
raise RuntimeError('Early DETECTED XFS HANG') from None
if proc.returncode is None:
log.debug('rsync timed out')
batch_size = batch.stat().st_size if batch.is_file() else 0
raise RuntimeError(f'Rsync works too long (more than {freeze_timeout} sec). Batch size is {batch_size}, Aborting.')
log.debug('rsync finished with code %d.', proc.returncode)
except: # noqa. see code of original check_call
log.debug('Killing rsync')
# Сначала прибиваем процесс, и только потом расфризиваем.
# Если сначала сделать анфриз, то процесс может уже завершиться успехом ДО отправки KILL.
# Если к моменту прибития рсинк както магически развис и завершился успехом,
# то наше прибитие не сделает ничего ибо процесс уже умер. НО НЕ ЗАВЕЙТИЛСЯ. Поэтому ENOSRCH не будет.
os.killpg(proc.pid, SIGKILL)
unfreeze() # обязательно ДО .wait() который будет в Popen.__exit__()
raise
log.debug('rsync finally waited')
log.debug('Unfrozen')
assert proc.returncode is not None
if proc.returncode != 0:
log.debug('rsync has failed')
raise CalledProcessError(proc.returncode, proc.args)
log.debug('Rsync success. Applying batch of size: %2.2f MB', batch.stat().st_size / 1_000_000)
with _measure('apply patch'):
check_call(
_RSYNC_ARGS
+ [
'--read-batch', batch,
'--',
f'{destination}/',
],
)
log.debug('Patch applied')
def _atomic_freeze_wrapper(source: Path, destination: Path, *, freeze_timeout: int, freeze_attempts: int, show_changes: bool) -> None:
for attempt in range(1, freeze_attempts + 1):
try:
_make_reflink_copy(source, destination)
_atomic_freeze(source, destination, freeze_timeout=freeze_timeout, show_changes=show_changes)
return
except Exception as err:
log.debug('Freeze copy failure. Attempt: %s. Error: %s', attempt, err)
log.debug('Sleeping for %d secs...', freeze_timeout)
sleep(freeze_timeout) # give system time to recover after long freeze
raise RuntimeError('Failed to create atomic snapshot using FSFREEZE.')
_SNAP_LV_NAME = 'atomic_fs_copy'
_SNAP_LV_TAG = 'atomic_fs_copy'
def _atomic_lvsnap(source: Path, destination: Path, *, show_changes: bool) -> None:
match json.loads(check_output(['findmnt', '-o', 'maj:min,fstype,target', '--nofsroot', '--json', '--target', source])):
case {'filesystems': [{'maj:min': str() as device_number, 'fstype': str() as fs_type, 'target': str() as root_mount}]}:
dev_maj, dev_min = device_number.split(':')
case _:
raise ValueError('Failed to parse findmnt result')
# Actually, should work on any FS.
# if fs_type != 'xfs':
# raise RuntimeError(f'Filesystem, type is not XFS: {fs_type}. Something went wrong.')
match json.loads(check_output([
'lvs',
'--select', f'lv_kernel_major={dev_maj} && lv_kernel_minor={dev_min}',
'-o', 'vg_name,lv_name',
'--reportformat=json',
])):
case {'report': [{'lv': [{'vg_name': str() as vg_name, 'lv_name': str() as src_lv_name}]}]}:
pass
case _:
raise ValueError('Failed to parse lvs result')
lvm_snap_mount_dir = Path('/run', _SNAP_LV_NAME)
if lvm_snap_mount_dir.is_mount():
log.warning('Snapshot was mounted. unmounting.')
check_call(['umount', lvm_snap_mount_dir])
if lvm_snap_mount_dir.exists():
rmtree(lvm_snap_mount_dir)
lvm_snap_mount_dir.mkdir()
# Will not exit with error if there are no such LVMs.
log.debug('Removing temporary LVMs if any.')
check_call(['lvremove', '--autobackup', 'n', '-y', '--select', f'lv_tags={_SNAP_LV_TAG}'])
check_call([
'lvcreate',
'--snapshot',
'--addtag', _SNAP_LV_TAG,
'--extents', '100%FREE',
'--name', _SNAP_LV_NAME,
'--autobackup', 'n',
f'{vg_name}/{src_lv_name}',
])
try:
snapshot_blockdev = Path(f'/dev/mapper/{vg_name}-{_SNAP_LV_NAME}')
log.debug('mounting snapshot %s to %s', snapshot_blockdev, lvm_snap_mount_dir)
with _measure('snapshot mounting'):
check_call(['mount', '-t', fs_type, '-o', 'ro,nouuid,norecovery', snapshot_blockdev, lvm_snap_mount_dir])
try:
src_snap_dir = lvm_snap_mount_dir / source.relative_to(root_mount)
if not src_snap_dir.exists():
raise RuntimeError('No same src dir on LVM snap FS. Should never happen.')
log.debug('Calling rsync %s -> %s', src_snap_dir, destination)
with _measure('rsync from snapshot'):
check_call(
_RSYNC_ARGS
+ [
*(['--itemize-changes'] if show_changes else []),
'--',
f'{src_snap_dir}/', # Закрывающий / в rsync для исходного каталога важен.
f'{destination}/',
],
)
finally:
log.debug('unmounting')
check_call(['umount', lvm_snap_mount_dir])
finally:
log.debug('lvremove snapshot')
check_call(['lvremove', '--autobackup', 'n', '-y', f'{vg_name}/{_SNAP_LV_NAME}'])
def _prepare() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Atomic Copy folder")
parser.add_argument('--debug', action='store_true', help='Enable debug mode.')
parser.add_argument("--method", type=str, choices=['freeze', 'lvmsnap', 'hybrid'], help="Type of the operation")
parser.add_argument(
"--freeze-timeout",
type=int,
help="Maximal time under FS freeze in one iteration. Ror 'freeze' or 'hybrid' methods",
# ICS-30307 Максимальное время заморозки fs 5 секунд. При изменении учесть _CONNECTION_LOST_DEADLINE.
default=5,
metavar='SECONDS',
)
parser.add_argument(
"--freeze-attempts",
type=int,
help="Max attempts to create snapshot. For 'freeze' or 'hybrid' methods",
default=5,
metavar='NUMBER',
)
parser.add_argument('--show-changes', action='store_true', help='Show changes while rsync is working.')
parser.add_argument("source", type=Path, help="Source directory path")
parser.add_argument("destination", type=Path, help="Destination directory path")
args = parser.parse_args()
return args
def main() -> None:
args = _prepare()
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO)
logging.raiseExceptions = False
logging.captureWarnings(True)
flock(os.open(__file__, O_RDONLY | O_CLOEXEC), LOCK_EX)
try:
args.source = args.source.resolve()
args.destination = args.destination.resolve()
if not args.source.is_dir():
raise ValueError(f'Source path {args.source} does not exist or is not a dir.')
if args.source.is_relative_to(args.destination):
raise ValueError('Impossible combination of dirs')
if args.destination.is_relative_to(args.source):
raise ValueError('Impossible combination of dirs')
# actually does not work between upperdir and overlayfs. Same fsid reported...
if os.statvfs(args.source).f_fsid != os.statvfs(args.destination.parent).f_fsid:
raise ValueError('Source and destination are on different FS.')
# Python does not provide f_type (!)
# https://stackoverflow.com/questions/48319246/how-can-i-determine-filesystem-type-name-with-linux-api-for-c
# os.statvfs(args.source).f_type
# so we can not check that fs is XFS.
if args.method != 'lvmsnap':
log.info('Using fast FSFREEZE method.')
try:
_atomic_freeze_wrapper(
args.source,
args.destination,
freeze_timeout=args.freeze_timeout,
freeze_attempts=args.freeze_attempts,
show_changes=args.show_changes,
)
return
except Exception as exc:
if args.method == 'freeze':
raise
log.warning('Fast FSFREEZE method failed: %s', exc)
log.info('Using slower LVM snap method.')
_make_reflink_copy(args.source, args.destination)
_atomic_lvsnap(args.source, args.destination, show_changes=args.show_changes)
except Exception:
if args.destination.exists():
with _measure('destination remove after lvm mount'):
args.destination.rmtree()
raise
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment