Skip to content

Instantly share code, notes, and snippets.

@natefoo
Created September 8, 2021 16:12
Show Gist options
  • Save natefoo/7bcd9a8757567431377162bea14a7ad1 to your computer and use it in GitHub Desktop.
Save natefoo/7bcd9a8757567431377162bea14a7ad1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Finds datasets on disk that shouldn't be there.
This script is not object-store aware. You just point it at directories that are DiskObjectStore backends (e.g.
file_path in the Galaxy config by default).
"""
from __future__ import absolute_import, print_function
import sys
from argparse import ArgumentParser
from errno import EEXIST
from os import (
chdir,
getcwd,
makedirs,
readlink,
walk
)
from os.path import (
exists,
getsize,
join,
islink
)
try:
from galaxy.config import Configuration
from galaxy.model import mapping
from galaxy.util import nice_size
from galaxy.util.script import app_properties_from_args, populate_config_args
except ImportError:
Configuration = None
mapping = None
nice_size = None
app_properties_from_args = None
populate_config_args = None
def parse_args(argv):
parser = ArgumentParser()
parser.add_argument('-g', '--galaxy', help='Path to Galaxy')
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory')
parser.add_argument('-l', '--log-file', default=sys.stderr,
help='Path to log file (relative to OUTPUT_DIR if not absolute)')
if populate_config_args:
print('reparsing arguments with galaxy.util.script', file=sys.stderr)
populate_config_args(parser)
parser.add_argument('dirs', nargs='+', help='Dir(s) to search')
return parser.parse_args(argv)
else:
print('parsing arguments without galaxy.util.script', file=sys.stderr)
parser.add_argument('dirs', nargs='+', help='Dir(s) to search')
return parser.parse_known_args(argv)[0]
def import_galaxy(args):
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set"
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr)
sys.path.append(join(args.galaxy, 'lib'))
print('changing dir to {}'.format(args.galaxy), file=sys.stderr)
chdir(args.galaxy)
global Configuration
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0)
Configuration = _t.Configuration
global mapping
_t = __import__('galaxy.model', globals(), locals(), ['mapping'], 0)
mapping = _t.mapping
global nice_size
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0)
nice_size = _t.nice_size
global app_properties_from_args
global populate_config_args
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0)
app_properties_from_args = _t.app_properties_from_args
populate_config_args = _t.populate_config_args
return mapping
def get_model(config):
print('initializing model', file=sys.stderr)
return mapping.init(config.file_path, config.database_connection, create_tables=False)
class Total(object):
def __init__(self, total=0):
self.__total = total
def __nonzero__(self):
return self.__bool__()
def __bool__(self):
return bool(self.__total)
def __iadd__(self, other):
self.__total += int(other)
return self
def __add__(self, other):
return self.__total + int(other)
def __int__(self):
return self.__total
def __str__(self):
return str(self.__total)
@property
def human(self):
return nice_size(self.__total)
class Outputs(object):
def __init__(self, output_dir, log_file):
self.__output_dir = output_dir
self.__log_file = log_file
self.__files = {}
try:
makedirs(self.__output_dir)
except (OSError, IOError) as exc:
if exc.errno != EEXIST:
raise
try:
self.log('logging to: stderr')
except AttributeError:
print('logging to: {}'.format(self.__log_file), file=sys.stderr)
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w')
self.log('opened log: {}'.format(self.__log_file.name))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
for f in self.__files.values():
f.close()
def __getitem__(self, name):
if name not in self.__files:
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w')
return self.__files[name]
def log(self, msg):
print(msg, file=self.__log_file)
def log_oserror(self, exc):
self.log('WARNING: {}: [Errno {}] {}: {}'.format(exc.__class__.__name__, exc.errno, exc.strerror, exc.filename))
class BadFinder(object):
def __init__(self, outputs, cumulative, model, model_object_name='Dataset'):
self.outputs = outputs
self.cumulative = cumulative
self.model = model
self.context = model.context.current
self.model_object_name = model_object_name
if model_object_name == 'Dataset':
self.model_object = model.Dataset
self.prefix = 'dataset_'
elif model_object_name == 'MetadataFile':
self.model_object = model.MetadataFile
self.prefix = 'metadata_'
else:
raise Exception('Unknown model object: {}'.format(model_object_name))
def log_summary(self, total, msg, c='-'):
if total:
sep = c * 2
self.outputs.log('{} {}: {} ({}) {} {}'.format(sep, self.cumulative.human, total.human, total, msg, sep))
def sum_dir(self, root):
total = Total()
for dirpath, dirnames, filenames in walk(root):
for filename, path in iter_contents(dirpath, filenames):
try:
size = size_sum(getsize(path), total)
self.outputs.log(
'{}: {} ({}) member {}'.format(
nice_size(self.cumulative + total), nice_size(size), size, path
)
)
except (IOError, OSError) as exc:
self.outputs.log_oserror(exc)
return total
def check_link_dirs(self, dirs):
links = []
for dirname, path in dirs:
if islink(path):
self.outputs.log('BAD: dir is link!: {} -> {}'.format(path, readlink(path)))
self.outputs['link_dirs'].write('{}\n'.format(path))
links.append(dirname)
return links
def check_efp_dirs(self, dirs):
total = Total()
for dirname, path in dirs:
# find all efp dirs, sum all should-be-purged
dataset_id = get_dataset_id(dirname, self.prefix, '_files')
d = self.context.query(self.model_object).get(dataset_id)
if d.purged:
size = size_sum(self.sum_dir(path), total, self.cumulative)
self.outputs['unpurged_efp_dirs'].write('{} {}\n'.format(size, path))
self.log_summary(Total(size), 'contents of unpurged efp dir {}/'.format(path))
return total
def check_invalid_dirs(self, dirs):
total = Total()
for dirname, path in dirs:
size = size_sum(self.sum_dir(path), total, self.cumulative)
self.outputs['invalid_dirs'].write('{} {}\n'.format(size, path))
self.log_summary(Total(size), 'contents of invalid dir {}/'.format(path))
return total
def bad_file(self, path, output='invalid_files'):
total = Total()
try:
size = size_sum(getsize(path), total, self.cumulative)
self.outputs.log('{}: {} ({}) {} {}'.format(self.cumulative.human, nice_size(size), size, output, path))
self.outputs[output].write('{} {}\n'.format(size, path))
except (IOError, OSError) as exc:
self.outputs.log_oserror(exc)
return total
def check_files(self, files):
total = Total()
badfiles = []
for filename, path in files:
if islink(path):
self.outputs.log('BAD: file is link!: {} -> {}'.format(path, readlink(path)))
self.outputs['link_files'].write('{}\n'.format(path))
continue
if filename.startswith(self.prefix) and filename.endswith('.dat'):
try:
dataset_id = get_dataset_id(filename, self.prefix, '.dat')
except ValueError:
self.outputs.log('WARNING: file has dataset-like filename but is invalid: {}'.format(path))
total += self.bad_file(path)
continue
d = self.context.query(self.model_object).get(dataset_id)
if not d:
self.outputs.log('WARNING: No dataset returned for ID: {}'.format(dataset_id))
continue
if d.purged:
total += self.bad_file(path, 'unpurged_files')
else:
total += self.bad_file(path)
return total
def walk_path(self, path):
cumulative = Total()
self.outputs.log('walking: {}'.format(path))
for dirpath, dirnames, filenames in walk(path):
self.outputs.log('walked to: {}'.format(dirpath))
total = Total()
rel = dirpath[len(path.rstrip('/')) + 1:]
# we should not be walking in any subdirs of hash dirs in the outer walk
if not dirpath == path and not is_hash_dir(rel):
self.outputs.log('WARNING: invalid dir (how did we get here?): {}'.format(dirpath))
self.outputs.log('WARNING: ignoring subdirs: {}'.format(dirnames))
del dirnames[:]
continue
# walk _metadata_files (at the root - otherwise it's invalid) later if it exists
if dirpath == path:
try:
dirnames.remove('_metadata_files')
except ValueError:
pass
# remove symlink dirs before dividing
links = self.check_link_dirs(iter_contents(dirpath, dirnames))
for link in links:
dirnames.remove(link)
# classify subdirs
hash_subdirs, efp_subdirs, invalid_subdirs = divide_subdirs(dirnames, self.prefix)
# remove all non-hash dirs from dirnames
del dirnames[:]
dirnames.extend(hash_subdirs)
# find bad stuff
efp_total = Total()
dirs_total = Total()
files_total = Total()
size_sum(self.check_efp_dirs(iter_contents(dirpath, efp_subdirs)), efp_total, total, cumulative)
self.log_summary(
efp_total, 'contents of all unpurged {} efp dirs in {}/'.format(self.model_object_name, dirpath), c="="
)
size_sum(self.check_invalid_dirs(iter_contents(dirpath, invalid_subdirs)), dirs_total, total, cumulative)
self.log_summary(dirs_total, 'contents of all invalid directories in {}/'.format(dirpath), c="=")
size_sum(self.check_files(iter_contents(dirpath, filenames)), files_total, total, cumulative)
self.log_summary(files_total, 'all unpurged/invalid files in {}/'.format(dirpath), c="=")
self.log_summary(total, 'all direct (non-recursive) bad contents of {}/'.format(dirpath), c="#")
mfpath = join(path, '_metadata_files')
if exists(mfpath):
finder = BadFinder(self.outputs, self.cumulative, self.model, model_object_name='MetadataFile')
finder.walk_path(mfpath)
self.log_summary(cumulative, 'total bad contents of {}/'.format(path), c="^")
return cumulative
def size_sum(s, *totals):
"""totals must be mutable (e.g. an instance of Total)"""
for total in totals:
total += s
return s
def iter_contents(dirpath, contents):
for content in contents:
yield (content, join(dirpath, content))
def is_hash_dir(dirpath):
try:
for elem in dirpath.split('/'):
assert int(elem) < 1000
return True
except (AssertionError, ValueError):
pass
return False
def get_dataset_id(name, prefix, suffix):
return int(name[len(prefix):][:-len(suffix)])
def divide_subdirs(dirnames, prefix):
# should only be called if at the root of a valid hash dir
hashdirs = []
efps = []
bads = []
for dirname in dirnames:
try:
# hash subdir
assert int(dirname) < 1000
hashdirs.append(dirname)
continue
except (AssertionError, ValueError):
pass
try:
assert dirname.startswith(prefix) and dirname.endswith('_files')
get_dataset_id(dirname, prefix, '_files')
efps.append(dirname)
continue
except (AssertionError, ValueError):
pass
bads.append(dirname)
return (sorted(hashdirs), sorted(efps), sorted(bads))
def walk_all(dirs, model, outputs):
total = Total()
for path in dirs:
outputs.log('finding files in: {}'.format(path))
finder = BadFinder(outputs, total, model)
finder.walk_path(path)
outputs.log('** {} ({}) total bad contents **'.format(total.human, total))
def main(argv):
args = parse_args(argv[1:])
cwd = getcwd()
if mapping is None:
import_galaxy(args)
args = parse_args(argv[1:])
properties = app_properties_from_args(args)
config = Configuration(**properties)
model = get_model(config)
print('changing dir to {}'.format(cwd), file=sys.stderr)
chdir(cwd)
with Outputs(args.output_dir, args.log_file) as outputs:
walk_all(args.dirs, model, outputs)
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment