Created
September 8, 2021 16:12
-
-
Save natefoo/7bcd9a8757567431377162bea14a7ad1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Finds datasets on disk that shouldn't be there. | |
This script is not object-store aware. You just point it at directories that are DiskObjectStore backends (e.g. | |
file_path in the Galaxy config by default). | |
""" | |
from __future__ import absolute_import, print_function | |
import sys | |
from argparse import ArgumentParser | |
from errno import EEXIST | |
from os import ( | |
chdir, | |
getcwd, | |
makedirs, | |
readlink, | |
walk | |
) | |
from os.path import ( | |
exists, | |
getsize, | |
join, | |
islink | |
) | |
try: | |
from galaxy.config import Configuration | |
from galaxy.model import mapping | |
from galaxy.util import nice_size | |
from galaxy.util.script import app_properties_from_args, populate_config_args | |
except ImportError: | |
Configuration = None | |
mapping = None | |
nice_size = None | |
app_properties_from_args = None | |
populate_config_args = None | |
def parse_args(argv): | |
parser = ArgumentParser() | |
parser.add_argument('-g', '--galaxy', help='Path to Galaxy') | |
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory') | |
parser.add_argument('-l', '--log-file', default=sys.stderr, | |
help='Path to log file (relative to OUTPUT_DIR if not absolute)') | |
if populate_config_args: | |
print('reparsing arguments with galaxy.util.script', file=sys.stderr) | |
populate_config_args(parser) | |
parser.add_argument('dirs', nargs='+', help='Dir(s) to search') | |
return parser.parse_args(argv) | |
else: | |
print('parsing arguments without galaxy.util.script', file=sys.stderr) | |
parser.add_argument('dirs', nargs='+', help='Dir(s) to search') | |
return parser.parse_known_args(argv)[0] | |
def import_galaxy(args): | |
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set" | |
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr) | |
sys.path.append(join(args.galaxy, 'lib')) | |
print('changing dir to {}'.format(args.galaxy), file=sys.stderr) | |
chdir(args.galaxy) | |
global Configuration | |
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0) | |
Configuration = _t.Configuration | |
global mapping | |
_t = __import__('galaxy.model', globals(), locals(), ['mapping'], 0) | |
mapping = _t.mapping | |
global nice_size | |
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0) | |
nice_size = _t.nice_size | |
global app_properties_from_args | |
global populate_config_args | |
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0) | |
app_properties_from_args = _t.app_properties_from_args | |
populate_config_args = _t.populate_config_args | |
return mapping | |
def get_model(config): | |
print('initializing model', file=sys.stderr) | |
return mapping.init(config.file_path, config.database_connection, create_tables=False) | |
class Total(object): | |
def __init__(self, total=0): | |
self.__total = total | |
def __nonzero__(self): | |
return self.__bool__() | |
def __bool__(self): | |
return bool(self.__total) | |
def __iadd__(self, other): | |
self.__total += int(other) | |
return self | |
def __add__(self, other): | |
return self.__total + int(other) | |
def __int__(self): | |
return self.__total | |
def __str__(self): | |
return str(self.__total) | |
@property | |
def human(self): | |
return nice_size(self.__total) | |
class Outputs(object): | |
def __init__(self, output_dir, log_file): | |
self.__output_dir = output_dir | |
self.__log_file = log_file | |
self.__files = {} | |
try: | |
makedirs(self.__output_dir) | |
except (OSError, IOError) as exc: | |
if exc.errno != EEXIST: | |
raise | |
try: | |
self.log('logging to: stderr') | |
except AttributeError: | |
print('logging to: {}'.format(self.__log_file), file=sys.stderr) | |
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w') | |
self.log('opened log: {}'.format(self.__log_file.name)) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
for f in self.__files.values(): | |
f.close() | |
def __getitem__(self, name): | |
if name not in self.__files: | |
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w') | |
return self.__files[name] | |
def log(self, msg): | |
print(msg, file=self.__log_file) | |
def log_oserror(self, exc): | |
self.log('WARNING: {}: [Errno {}] {}: {}'.format(exc.__class__.__name__, exc.errno, exc.strerror, exc.filename)) | |
class BadFinder(object): | |
def __init__(self, outputs, cumulative, model, model_object_name='Dataset'): | |
self.outputs = outputs | |
self.cumulative = cumulative | |
self.model = model | |
self.context = model.context.current | |
self.model_object_name = model_object_name | |
if model_object_name == 'Dataset': | |
self.model_object = model.Dataset | |
self.prefix = 'dataset_' | |
elif model_object_name == 'MetadataFile': | |
self.model_object = model.MetadataFile | |
self.prefix = 'metadata_' | |
else: | |
raise Exception('Unknown model object: {}'.format(model_object_name)) | |
def log_summary(self, total, msg, c='-'): | |
if total: | |
sep = c * 2 | |
self.outputs.log('{} {}: {} ({}) {} {}'.format(sep, self.cumulative.human, total.human, total, msg, sep)) | |
def sum_dir(self, root): | |
total = Total() | |
for dirpath, dirnames, filenames in walk(root): | |
for filename, path in iter_contents(dirpath, filenames): | |
try: | |
size = size_sum(getsize(path), total) | |
self.outputs.log( | |
'{}: {} ({}) member {}'.format( | |
nice_size(self.cumulative + total), nice_size(size), size, path | |
) | |
) | |
except (IOError, OSError) as exc: | |
self.outputs.log_oserror(exc) | |
return total | |
def check_link_dirs(self, dirs): | |
links = [] | |
for dirname, path in dirs: | |
if islink(path): | |
self.outputs.log('BAD: dir is link!: {} -> {}'.format(path, readlink(path))) | |
self.outputs['link_dirs'].write('{}\n'.format(path)) | |
links.append(dirname) | |
return links | |
def check_efp_dirs(self, dirs): | |
total = Total() | |
for dirname, path in dirs: | |
# find all efp dirs, sum all should-be-purged | |
dataset_id = get_dataset_id(dirname, self.prefix, '_files') | |
d = self.context.query(self.model_object).get(dataset_id) | |
if d.purged: | |
size = size_sum(self.sum_dir(path), total, self.cumulative) | |
self.outputs['unpurged_efp_dirs'].write('{} {}\n'.format(size, path)) | |
self.log_summary(Total(size), 'contents of unpurged efp dir {}/'.format(path)) | |
return total | |
def check_invalid_dirs(self, dirs): | |
total = Total() | |
for dirname, path in dirs: | |
size = size_sum(self.sum_dir(path), total, self.cumulative) | |
self.outputs['invalid_dirs'].write('{} {}\n'.format(size, path)) | |
self.log_summary(Total(size), 'contents of invalid dir {}/'.format(path)) | |
return total | |
def bad_file(self, path, output='invalid_files'): | |
total = Total() | |
try: | |
size = size_sum(getsize(path), total, self.cumulative) | |
self.outputs.log('{}: {} ({}) {} {}'.format(self.cumulative.human, nice_size(size), size, output, path)) | |
self.outputs[output].write('{} {}\n'.format(size, path)) | |
except (IOError, OSError) as exc: | |
self.outputs.log_oserror(exc) | |
return total | |
def check_files(self, files): | |
total = Total() | |
badfiles = [] | |
for filename, path in files: | |
if islink(path): | |
self.outputs.log('BAD: file is link!: {} -> {}'.format(path, readlink(path))) | |
self.outputs['link_files'].write('{}\n'.format(path)) | |
continue | |
if filename.startswith(self.prefix) and filename.endswith('.dat'): | |
try: | |
dataset_id = get_dataset_id(filename, self.prefix, '.dat') | |
except ValueError: | |
self.outputs.log('WARNING: file has dataset-like filename but is invalid: {}'.format(path)) | |
total += self.bad_file(path) | |
continue | |
d = self.context.query(self.model_object).get(dataset_id) | |
if not d: | |
self.outputs.log('WARNING: No dataset returned for ID: {}'.format(dataset_id)) | |
continue | |
if d.purged: | |
total += self.bad_file(path, 'unpurged_files') | |
else: | |
total += self.bad_file(path) | |
return total | |
def walk_path(self, path): | |
cumulative = Total() | |
self.outputs.log('walking: {}'.format(path)) | |
for dirpath, dirnames, filenames in walk(path): | |
self.outputs.log('walked to: {}'.format(dirpath)) | |
total = Total() | |
rel = dirpath[len(path.rstrip('/')) + 1:] | |
# we should not be walking in any subdirs of hash dirs in the outer walk | |
if not dirpath == path and not is_hash_dir(rel): | |
self.outputs.log('WARNING: invalid dir (how did we get here?): {}'.format(dirpath)) | |
self.outputs.log('WARNING: ignoring subdirs: {}'.format(dirnames)) | |
del dirnames[:] | |
continue | |
# walk _metadata_files (at the root - otherwise it's invalid) later if it exists | |
if dirpath == path: | |
try: | |
dirnames.remove('_metadata_files') | |
except ValueError: | |
pass | |
# remove symlink dirs before dividing | |
links = self.check_link_dirs(iter_contents(dirpath, dirnames)) | |
for link in links: | |
dirnames.remove(link) | |
# classify subdirs | |
hash_subdirs, efp_subdirs, invalid_subdirs = divide_subdirs(dirnames, self.prefix) | |
# remove all non-hash dirs from dirnames | |
del dirnames[:] | |
dirnames.extend(hash_subdirs) | |
# find bad stuff | |
efp_total = Total() | |
dirs_total = Total() | |
files_total = Total() | |
size_sum(self.check_efp_dirs(iter_contents(dirpath, efp_subdirs)), efp_total, total, cumulative) | |
self.log_summary( | |
efp_total, 'contents of all unpurged {} efp dirs in {}/'.format(self.model_object_name, dirpath), c="=" | |
) | |
size_sum(self.check_invalid_dirs(iter_contents(dirpath, invalid_subdirs)), dirs_total, total, cumulative) | |
self.log_summary(dirs_total, 'contents of all invalid directories in {}/'.format(dirpath), c="=") | |
size_sum(self.check_files(iter_contents(dirpath, filenames)), files_total, total, cumulative) | |
self.log_summary(files_total, 'all unpurged/invalid files in {}/'.format(dirpath), c="=") | |
self.log_summary(total, 'all direct (non-recursive) bad contents of {}/'.format(dirpath), c="#") | |
mfpath = join(path, '_metadata_files') | |
if exists(mfpath): | |
finder = BadFinder(self.outputs, self.cumulative, self.model, model_object_name='MetadataFile') | |
finder.walk_path(mfpath) | |
self.log_summary(cumulative, 'total bad contents of {}/'.format(path), c="^") | |
return cumulative | |
def size_sum(s, *totals): | |
"""totals must be mutable (e.g. an instance of Total)""" | |
for total in totals: | |
total += s | |
return s | |
def iter_contents(dirpath, contents): | |
for content in contents: | |
yield (content, join(dirpath, content)) | |
def is_hash_dir(dirpath): | |
try: | |
for elem in dirpath.split('/'): | |
assert int(elem) < 1000 | |
return True | |
except (AssertionError, ValueError): | |
pass | |
return False | |
def get_dataset_id(name, prefix, suffix): | |
return int(name[len(prefix):][:-len(suffix)]) | |
def divide_subdirs(dirnames, prefix): | |
# should only be called if at the root of a valid hash dir | |
hashdirs = [] | |
efps = [] | |
bads = [] | |
for dirname in dirnames: | |
try: | |
# hash subdir | |
assert int(dirname) < 1000 | |
hashdirs.append(dirname) | |
continue | |
except (AssertionError, ValueError): | |
pass | |
try: | |
assert dirname.startswith(prefix) and dirname.endswith('_files') | |
get_dataset_id(dirname, prefix, '_files') | |
efps.append(dirname) | |
continue | |
except (AssertionError, ValueError): | |
pass | |
bads.append(dirname) | |
return (sorted(hashdirs), sorted(efps), sorted(bads)) | |
def walk_all(dirs, model, outputs): | |
total = Total() | |
for path in dirs: | |
outputs.log('finding files in: {}'.format(path)) | |
finder = BadFinder(outputs, total, model) | |
finder.walk_path(path) | |
outputs.log('** {} ({}) total bad contents **'.format(total.human, total)) | |
def main(argv): | |
args = parse_args(argv[1:]) | |
cwd = getcwd() | |
if mapping is None: | |
import_galaxy(args) | |
args = parse_args(argv[1:]) | |
properties = app_properties_from_args(args) | |
config = Configuration(**properties) | |
model = get_model(config) | |
print('changing dir to {}'.format(cwd), file=sys.stderr) | |
chdir(cwd) | |
with Outputs(args.output_dir, args.log_file) as outputs: | |
walk_all(args.dirs, model, outputs) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment