Created
December 2, 2014 18:14
-
-
Save leandro-lucarella-sociomantic/174bf7b182e2e7289be7 to your computer and use it in GitHub Desktop.
Purge Debian packages files from a directory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import argparse | |
from datetime import datetime, timedelta | |
debug = False | |
quiet = False | |
def main(): | |
args = parse_args() | |
global debug, quiet | |
debug = args.debug | |
quiet = args.quiet | |
now = datetime.now() | |
older_than = now - timedelta(args.older_than) | |
for d in args.directory: | |
purge_directory(d, now, older_than, args.keep, args.remove) | |
def purge_directory(directory, now, older_than, keep, remove): | |
debugf("Directory: {}", directory) | |
packages = PackageRegistry() | |
for f in get_files(directory): | |
if not f.endswith('.deb'): | |
debugf("\tSkipping file {}, is not a package", f) | |
continue | |
packages.add_file(f) | |
for pkg in packages.values(): | |
purge_pkg(pkg, now, older_than, keep, remove) | |
def purge_pkg(pkg, now, older_than, keep, remove): | |
debugf("\tPackage: {}", pkg) | |
sorted_files = sorted(pkg.files, key=lambda f: f.mtime, reverse=True) | |
to_purge = [f for f in sorted_files[keep:] if f.mtime <= older_than] | |
to_keep = sorted_files[:-len(to_purge)] if to_purge else sorted_files | |
for f in sorted_files: | |
debugf("\t\t{} ({} day(s) old)", f, (now - f.mtime).days) | |
debugf("\t\tTo keep:") | |
for f in to_keep: | |
debugf("\t\t\t{} ({} day(s) old)", f, (now - f.mtime).days) | |
debugf("\t\tTo purge:") | |
for f in to_purge: | |
debugf("\t\t\t{} ({} day(s) old)", f, (now - f.mtime).days) | |
printf("{}", f) | |
if remove: | |
try: | |
os.unlink(str(f)) | |
except Exception as e: | |
errorf("Can't remove {}: {}", f, e) | |
def get_files(d): | |
try: | |
return [os.path.join(d, f) for f in os.listdir(d)] | |
except Exception as e: | |
die(2, "Can't read directory {}: {}", d, e) | |
class PackageRegistry (dict): | |
def add_file(self, fname): | |
pkg_file = PackageFile(fname) | |
pkg_name = pkg_file.pkg_name | |
if pkg_name not in self: | |
self[pkg_name] = Package(pkg_name) | |
self[pkg_name].append(pkg_file) | |
class Package (list): | |
def __init__(self, name, *args): | |
super(Package, self).__init__(*args) | |
self.name = name | |
def __repr__(self): | |
s = self.__class__.__name__ + '(name=' + repr(self.name) | |
if len(self): | |
s += ', ' + ', '.join([repr(f) for f in self]) | |
s += ')' | |
return s | |
def __str__(self): | |
return self.name | |
@property | |
def files(self): | |
return self | |
class PackageFile: | |
def __init__(self, fname): | |
self.file_name = fname | |
name, version = os.path.basename(fname).split('_')[:2] | |
self.pkg_name = name | |
self.pkg_version = version | |
self.mtime = datetime.fromtimestamp(os.stat(fname).st_mtime) | |
def __repr__(self): | |
s = self.__class__.__name__ + '(' | |
s += ', '.join(["{}={!r}".format(a, getattr(self, a)) | |
for a in 'pkg_name pkg_version mtime'.split()]) | |
s += ')' | |
return s | |
def __str__(self): | |
return self.file_name | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description='Search for old packages to purge') | |
parser.add_argument('directory', nargs='+', | |
help='directory to purge') | |
parser.add_argument('-o', '--older-than', | |
metavar='DAYS', type=int, default=365, | |
help='purge packages older than DAYS days') | |
parser.add_argument('-k', '--keep', metavar='N', type=int, default=1, | |
help='always keep at least the last (newest) N ' | |
'versions of each package') | |
parser.add_argument('-r', '--remove', action='store_true', | |
help="also actually remove the files") | |
parser.add_argument('-d', '--debug', action='store_true', | |
help='show debug output') | |
parser.add_argument('-q', '--quiet', action='store_true', | |
help="don't show any messages (only errors)") | |
return parser.parse_args() | |
def errorf(fmt, *args): | |
print(fmt.format(*args), file=sys.stderr) | |
def debugf(fmt, *args): | |
if debug: | |
errorf(fmt, *args) | |
def printf(fmt, *args): | |
if not quiet: | |
print(fmt.format(*args)) | |
def die(status, fmt=None, *args): | |
if fmt is not None: | |
errorf(fmt, *args) | |
sys.exit(status) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment