Last active
January 4, 2024 09:08
-
-
Save Girgitt/c69b461c5bf4dad42cee3421b0ac0e2f to your computer and use it in GitHub Desktop.
disk cleaner - code to keep system running in case of disk space exhaustion e.g. by excessive logging or other unexpected condition
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import time | |
import datetime | |
import argparse | |
from sys import exit | |
from pathlib import Path | |
from typing import Iterator, Tuple | |
from shutil import disk_usage, rmtree | |
""" | |
# based on https://askubuntu.com/a/1386831/845237 | |
# extended with args: recursive, days_to_keep, size_to_keep, pattern | |
# made more robust by avoiding /proc dir and skipping on FileNotFound exceptions when sorting glob result | |
# regular logs but older than two weeks | |
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip --recursive --days_to_keep=14 --size_to_keep=1 | |
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip --recursive --days_to_keep=5 --size_to_keep=1 | |
# huge logs older than a day (probably not rotated properly etc.) | |
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip,*.journal --recursive --days_to_keep=1 --size_to_keep=60 | |
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip,*.journal --recursive --days_to_keep=1 --size_to_keep=10 | |
""" | |
def get_device(path: Path) -> str: | |
"""Find the mount for a given directory. This is needed only for logging purpose.""" | |
# Read /etc/mtab to learn about mount points | |
mtab_entries = Path("/etc/mtab").read_text().splitlines() | |
# Create a dict of mount points and devices | |
mount_points = dict([list(reversed(line.split(" ")[:2])) for line in mtab_entries]) | |
# Find the mount point of given path | |
while path.resolve(True).as_posix() not in mount_points: | |
path = path.parent | |
# Return device associated with mount point | |
return mount_points[path.as_posix()] | |
def get_directory_and_device(path: str) -> Tuple[str, Path]: | |
"""Exit the process if directory does not exist.""" | |
fs_path = Path(path) | |
# Path must exist | |
if not fs_path.exists(): | |
print(f"ERROR: No such directory: {path}") | |
exit(1) | |
# And path must be a valid directory | |
if not fs_path.is_dir(): | |
print(f"Path must be a directory and not a file: {path}") | |
exit(1) | |
# Get the device | |
device = get_device(fs_path) | |
return device, fs_path | |
def get_disk_usage(path: Path) -> float: | |
# shutil.disk_usage support Path like objects so no need to cast to string | |
usage = disk_usage(path) | |
# Get disk usage in percentage | |
try: | |
return usage.used / usage.total * 100 | |
except ZeroDivisionError: | |
return 0 | |
def remove_file_or_directory(path: Path, remove_directories=False) -> None: | |
"""Remove given path, which can be a directory or a file.""" | |
# Remove files | |
if path.is_file(): | |
path.unlink() | |
if remove_directories: | |
# Recursively delete directory trees | |
if path.is_dir(): | |
rmtree(path) | |
def find_oldest_files( | |
path: Path, | |
pattern: str = "*", | |
threshold: int = 80, | |
recursive: bool = False, | |
days_to_keep: int = 14, | |
size_to_keep: int = 1 | |
) -> Iterator[Path]: | |
"""Iterate on the files or directories present in a directory which match given pattern.""" | |
files = [] | |
for pattern in pattern.split(","): | |
# List the files in the directory received as argument and sort them by age | |
if recursive: | |
pattern = "**/"+pattern | |
# print(f"searching pattern: {pattern}") | |
files_in_path_matching_pattern = [] | |
files_matching_pattern = [] | |
# we want to avoid exceptions thrown in generation during iteration which can't be handled thus casting to list | |
for i in range(3): | |
try: | |
files_matching_pattern = list(path.glob(pattern)) | |
except FileNotFoundError: | |
print(f"failed to get files for pattern {pattern}") | |
break | |
for item in files_matching_pattern: | |
try: | |
if str(item).startswith('/proc'): | |
continue | |
if item.is_file(): | |
if not item.is_symlink(): | |
files_in_path_matching_pattern.append(item) | |
elif item.is_dir(): | |
files_in_path_matching_pattern.append(item) | |
except PermissionError: | |
print(f"failed to check file {item} due to permission error") | |
files.extend(files_in_path_matching_pattern) | |
# initial approach which resulted in a generator causing problem later on with sorting when fd disappeared etc. | |
# files.extend([item for item in path.glob(pattern) if item.is_file()]) | |
sorted_files_snapshot = [] | |
for in_file in files: | |
try: | |
sorted_files_snapshot.append({'val': in_file, 'key': os.stat(in_file).st_mtime}) | |
except FileNotFoundError: | |
continue | |
sorted_files_snapshot = sorted(sorted_files_snapshot, key=lambda x: x['key']) | |
sorted_files = [item['val'] for item in sorted_files_snapshot] | |
# initial approach failing when fd did not exist anymore | |
#sorted_files = sorted(files, key=getmtime) | |
# Yield file paths until usage is lower than threshold | |
for file in sorted_files: | |
usage = get_disk_usage(path) | |
if usage < threshold: | |
break | |
try: | |
fstat = os.stat(file) | |
except FileNotFoundError: | |
print(f"could not stat file: {file}; skipping it") | |
continue | |
if time.time() - fstat.st_mtime < int(days_to_keep) * 24 * 60 * 60: | |
pass | |
#print(f"file {file} not older than {days_to_keep} days; not deleting") | |
elif size_to_keep > fstat.st_size / 1024 / 1024: | |
pass | |
#print(f"file {file} smaller ({round(fstat.st_size / 1024 / 1024, 2)}) than {size_to_keep} MB; not deleting") | |
else: | |
#print(f"file candidate for removal: {file}, size: {fstat.st_size / 1024 / 1024}") | |
yield file, fstat | |
def check_and_clean( | |
path: str, | |
threshold: int = 80, | |
patterns: str = "*", | |
remove: bool = False, | |
remove_directories: bool = False, | |
recursive: bool = False, | |
days_to_keep: int = 14, | |
size_to_keep: int = 1, | |
) -> None: | |
"""Main function""" | |
device, fspath = get_directory_and_device(path) | |
# shutil.disk_usage support Path like objects so no need to cast to string | |
usage = get_disk_usage(path) | |
# Take action if needed | |
if float(usage) > threshold: | |
print(f"Disk usage is greater than threshold: {usage:.2f}% > {threshold}% ({device})") | |
# Iterate over files to remove | |
for file, fstat in find_oldest_files(fspath, patterns, threshold, recursive, days_to_keep, size_to_keep): | |
print(f"Removing file {file}, size: {round(fstat.st_size / 1024 / 1024, 2)} MB, mtime: {datetime.datetime.fromtimestamp(fstat.st_mtime)}") | |
if remove: | |
remove_file_or_directory(file, remove_directories) | |
def main() -> None: | |
parser = argparse.ArgumentParser( | |
description="Purge old files when disk usage is above limit." | |
) | |
parser.add_argument( | |
"path", help="Directory path where files should be purged", type=str | |
) | |
parser.add_argument( | |
"--threshold", | |
"-t", | |
metavar="T", | |
help="Usage threshold in percentage", | |
type=int, | |
default=80, | |
) | |
parser.add_argument( | |
"--pattern", | |
metavar="P", | |
help="file names patterns separated with comma", | |
type=str, | |
default="*", | |
) | |
parser.add_argument( | |
"--days_to_keep", | |
help="Only files older than <7> days will be deleted", | |
type=int, | |
default=7, | |
) | |
parser.add_argument( | |
"--size_to_keep", | |
help="Only files bigger than <1> MB will be deleted", | |
type=int, | |
default=1, | |
) | |
parser.add_argument( | |
"--remove", | |
"--rm", | |
help="Files are not removed unless --removed or --rm option is specified", | |
action="store_true", | |
default=False, | |
) | |
parser.add_argument( | |
"--include_directories", | |
help="remove also directories", | |
action="store_true", | |
default=False, | |
) | |
parser.add_argument( | |
"--recursive", | |
help="search also sub directories of the base path", | |
action="store_true", | |
default=False, | |
) | |
args = parser.parse_args() | |
check_and_clean( | |
args.path, | |
threshold=args.threshold, | |
patterns=args.pattern, | |
remove=args.remove, | |
remove_directories=args.include_directories, | |
recursive=args.recursive, | |
days_to_keep=args.days_to_keep, | |
size_to_keep=args.size_to_keep | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment