Skip to content

Instantly share code, notes, and snippets.

@Girgitt
Last active January 4, 2024 09:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Girgitt/c69b461c5bf4dad42cee3421b0ac0e2f to your computer and use it in GitHub Desktop.
Save Girgitt/c69b461c5bf4dad42cee3421b0ac0e2f to your computer and use it in GitHub Desktop.
disk cleaner - code to keep system running in case of disk space exhaustion e.g. by excessive logging or other unexpected condition
#!/usr/bin/env python3
import os
import time
import datetime
import argparse
from sys import exit
from pathlib import Path
from typing import Iterator, Tuple
from shutil import disk_usage, rmtree
"""
# based on https://askubuntu.com/a/1386831/845237
# extended with args: recursive, days_to_keep, size_to_keep, pattern
# made more robust by avoiding /proc dir and skipping on FileNotFound exceptions when sorting glob result
# regular logs but older than two weeks
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip --recursive --days_to_keep=14 --size_to_keep=1
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip --recursive --days_to_keep=5 --size_to_keep=1
# huge logs older than a day (probably not rotated properly etc.)
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip,*.journal --recursive --days_to_keep=1 --size_to_keep=60
python3 disk_cleaner_main.py /var/log/ -t 93 --pattern=*.log,*.log.*,*log*.gz,*log*.zip,*.journal --recursive --days_to_keep=1 --size_to_keep=10
"""
def get_device(path: Path) -> str:
"""Find the mount for a given directory. This is needed only for logging purpose."""
# Read /etc/mtab to learn about mount points
mtab_entries = Path("/etc/mtab").read_text().splitlines()
# Create a dict of mount points and devices
mount_points = dict([list(reversed(line.split(" ")[:2])) for line in mtab_entries])
# Find the mount point of given path
while path.resolve(True).as_posix() not in mount_points:
path = path.parent
# Return device associated with mount point
return mount_points[path.as_posix()]
def get_directory_and_device(path: str) -> Tuple[str, Path]:
"""Exit the process if directory does not exist."""
fs_path = Path(path)
# Path must exist
if not fs_path.exists():
print(f"ERROR: No such directory: {path}")
exit(1)
# And path must be a valid directory
if not fs_path.is_dir():
print(f"Path must be a directory and not a file: {path}")
exit(1)
# Get the device
device = get_device(fs_path)
return device, fs_path
def get_disk_usage(path: Path) -> float:
# shutil.disk_usage support Path like objects so no need to cast to string
usage = disk_usage(path)
# Get disk usage in percentage
try:
return usage.used / usage.total * 100
except ZeroDivisionError:
return 0
def remove_file_or_directory(path: Path, remove_directories=False) -> None:
"""Remove given path, which can be a directory or a file."""
# Remove files
if path.is_file():
path.unlink()
if remove_directories:
# Recursively delete directory trees
if path.is_dir():
rmtree(path)
def find_oldest_files(
path: Path,
pattern: str = "*",
threshold: int = 80,
recursive: bool = False,
days_to_keep: int = 14,
size_to_keep: int = 1
) -> Iterator[Path]:
"""Iterate on the files or directories present in a directory which match given pattern."""
files = []
for pattern in pattern.split(","):
# List the files in the directory received as argument and sort them by age
if recursive:
pattern = "**/"+pattern
# print(f"searching pattern: {pattern}")
files_in_path_matching_pattern = []
files_matching_pattern = []
# we want to avoid exceptions thrown in generation during iteration which can't be handled thus casting to list
for i in range(3):
try:
files_matching_pattern = list(path.glob(pattern))
except FileNotFoundError:
print(f"failed to get files for pattern {pattern}")
break
for item in files_matching_pattern:
try:
if str(item).startswith('/proc'):
continue
if item.is_file():
if not item.is_symlink():
files_in_path_matching_pattern.append(item)
elif item.is_dir():
files_in_path_matching_pattern.append(item)
except PermissionError:
print(f"failed to check file {item} due to permission error")
files.extend(files_in_path_matching_pattern)
# initial approach which resulted in a generator causing problem later on with sorting when fd disappeared etc.
# files.extend([item for item in path.glob(pattern) if item.is_file()])
sorted_files_snapshot = []
for in_file in files:
try:
sorted_files_snapshot.append({'val': in_file, 'key': os.stat(in_file).st_mtime})
except FileNotFoundError:
continue
sorted_files_snapshot = sorted(sorted_files_snapshot, key=lambda x: x['key'])
sorted_files = [item['val'] for item in sorted_files_snapshot]
# initial approach failing when fd did not exist anymore
#sorted_files = sorted(files, key=getmtime)
# Yield file paths until usage is lower than threshold
for file in sorted_files:
usage = get_disk_usage(path)
if usage < threshold:
break
try:
fstat = os.stat(file)
except FileNotFoundError:
print(f"could not stat file: {file}; skipping it")
continue
if time.time() - fstat.st_mtime < int(days_to_keep) * 24 * 60 * 60:
pass
#print(f"file {file} not older than {days_to_keep} days; not deleting")
elif size_to_keep > fstat.st_size / 1024 / 1024:
pass
#print(f"file {file} smaller ({round(fstat.st_size / 1024 / 1024, 2)}) than {size_to_keep} MB; not deleting")
else:
#print(f"file candidate for removal: {file}, size: {fstat.st_size / 1024 / 1024}")
yield file, fstat
def check_and_clean(
path: str,
threshold: int = 80,
patterns: str = "*",
remove: bool = False,
remove_directories: bool = False,
recursive: bool = False,
days_to_keep: int = 14,
size_to_keep: int = 1,
) -> None:
"""Main function"""
device, fspath = get_directory_and_device(path)
# shutil.disk_usage support Path like objects so no need to cast to string
usage = get_disk_usage(path)
# Take action if needed
if float(usage) > threshold:
print(f"Disk usage is greater than threshold: {usage:.2f}% > {threshold}% ({device})")
# Iterate over files to remove
for file, fstat in find_oldest_files(fspath, patterns, threshold, recursive, days_to_keep, size_to_keep):
print(f"Removing file {file}, size: {round(fstat.st_size / 1024 / 1024, 2)} MB, mtime: {datetime.datetime.fromtimestamp(fstat.st_mtime)}")
if remove:
remove_file_or_directory(file, remove_directories)
def main() -> None:
parser = argparse.ArgumentParser(
description="Purge old files when disk usage is above limit."
)
parser.add_argument(
"path", help="Directory path where files should be purged", type=str
)
parser.add_argument(
"--threshold",
"-t",
metavar="T",
help="Usage threshold in percentage",
type=int,
default=80,
)
parser.add_argument(
"--pattern",
metavar="P",
help="file names patterns separated with comma",
type=str,
default="*",
)
parser.add_argument(
"--days_to_keep",
help="Only files older than <7> days will be deleted",
type=int,
default=7,
)
parser.add_argument(
"--size_to_keep",
help="Only files bigger than <1> MB will be deleted",
type=int,
default=1,
)
parser.add_argument(
"--remove",
"--rm",
help="Files are not removed unless --removed or --rm option is specified",
action="store_true",
default=False,
)
parser.add_argument(
"--include_directories",
help="remove also directories",
action="store_true",
default=False,
)
parser.add_argument(
"--recursive",
help="search also sub directories of the base path",
action="store_true",
default=False,
)
args = parser.parse_args()
check_and_clean(
args.path,
threshold=args.threshold,
patterns=args.pattern,
remove=args.remove,
remove_directories=args.include_directories,
recursive=args.recursive,
days_to_keep=args.days_to_keep,
size_to_keep=args.size_to_keep
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment