Created
September 4, 2023 11:22
-
-
Save alaniwi/fa9e68cfbe6e8e238b317e3dd232855b to your computer and use it in GitHub Desktop.
thin_backups.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Looks for files (or directories) in a given directory, whose filenames | |
match a specified strptime format. | |
For those that are more than a specified number of days old, deletes certain | |
files, such that the interval between the ones that are retained does not | |
exceed the maximum specified interval. | |
------ | |
Example: | |
thin-backups.py /path/to/somedir some_file_stem.%Y%m%d 30:7 180:30 | |
keeps weekly files after 30 days old, | |
and monthly(ish) files after 180 days old | |
------ | |
""" | |
import os | |
import re | |
import time | |
import shutil | |
import datetime | |
import argparse | |
from argparse import RawTextHelpFormatter | |
def time_spec_type(arg): | |
m = re.match('(\d+):(\d+)$', arg) | |
if not m: | |
raise argparse.ArgumentTypeError(arg) | |
thin_after = int(m.group(1)) | |
max_interval = int(m.group(2)) | |
return (datetime.timedelta(days=thin_after), | |
datetime.timedelta(days=max_interval)) | |
def get_args(): | |
parser = argparse.ArgumentParser(epilog=__doc__, | |
formatter_class=RawTextHelpFormatter) | |
parser.add_argument('directory', type=str, | |
help='directory path') | |
parser.add_argument('format', type=str, | |
help=('strptime format for date parsing ' | |
'(should match whole filename)')) | |
parser.add_argument('time_specification', type=time_spec_type, | |
help=('Time spec in format thin_after:max_interval ' | |
'e.g. 30:7 to keep retain weekly files after 30 days'), | |
nargs='+') | |
parser.add_argument('-n', '--dry-run', action='store_true', | |
help='just print a list of what would be deleted') | |
parser.add_argument('-r', '--recursive-delete', action='store_true', | |
help='entries are directories; use rmtree instead of remove') | |
parser.add_argument('-v', '--verbose', action='store_true', | |
help='display some additional messages') | |
args = parser.parse_args() | |
if args.time_specification != sorted(args.time_specification): | |
parser.error('time specs should be given in increasing order of age') | |
return args | |
def get_entries(args): | |
""" | |
Get a list of 2-tuples (path, datetime obj) for files in the directory | |
that match the expected filename pattern. | |
Return in order newest to oldest | |
""" | |
names = sorted(os.listdir(args.directory)) | |
ret = [] | |
for name in names: | |
try: | |
dt = datetime.datetime.strptime(name, args.format) | |
except ValueError: | |
if args.verbose: | |
print(f'ignoring {name}, does not match {args.format}') | |
continue | |
path = os.path.join(args.directory, name) | |
ret.append((path, dt)) | |
if args.verbose: | |
print(f'found {path}, {dt}') | |
return ret | |
def get_max_interval_for_age(age, args): | |
""" | |
Returns maximum deletion interval relevant to the given file age, | |
or returns None if it is not yet old enough to be deleted | |
""" | |
ret = None | |
for age_cutoff, interval in args.time_specification: | |
if age < age_cutoff: | |
break | |
ret = interval | |
return ret | |
def get_deletions(entries, args): | |
""" | |
Works out which can be deleted | |
Relies on list of entries being in reverse date order. | |
Steps through from newest to oldest, deciding which entries can | |
be deleted, and returns a list. | |
An entry can be deleted if: | |
- it is older than thin_after days, and | |
- it is not the oldest or newest, and | |
- the interval between the two snapshots either side of it (in date | |
order, and excluding any newer snapshot already listed for | |
deletion) does not exceed the max interval | |
""" | |
deletions = [] | |
index = 1 | |
entries = entries[:] # work on a copy | |
today = datetime.datetime.now() | |
while index < len(entries) - 1: | |
entry = entries[index] | |
name = entry[0] | |
age = today - entry[1] | |
max_interval = get_max_interval_for_age(age, args) | |
if max_interval is None: | |
index += 1 | |
if args.verbose: | |
print(f'{name} RETAIN [too new to delete]') | |
else: | |
interval = entries[index + 1][1] - entries[index - 1][1] | |
if interval <= max_interval: | |
deletions.append(entry[0]) | |
del entries[index] | |
if args.verbose: | |
print(f'{name} *DELETE* [{interval} <= {max_interval}]') | |
else: | |
index += 1 | |
if args.verbose: | |
print(f'{name} RETAIN [{interval} > {max_interval}]') | |
return deletions | |
def do_deletion(path, args): | |
if args.verbose: | |
print(f'removing {path}') | |
if args.recursive_delete: | |
shutil.rmtree(path) | |
else: | |
os.remove(path) | |
def main(): | |
args = get_args() | |
entries = get_entries(args) | |
deletions = get_deletions(entries, args) | |
for path in deletions: | |
print(path) | |
if not args.dry_run: | |
do_deletion(path, args) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment