Skip to content

Instantly share code, notes, and snippets.

@natefoo
Last active October 1, 2018 16:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save natefoo/b98a5f08cd62a6750cd1e338076e2cda to your computer and use it in GitHub Desktop.
Save natefoo/b98a5f08cd62a6750cd1e338076e2cda to your computer and use it in GitHub Desktop.
Find stuff in Galaxy's file_path that shouldn't be there
#!/usr/bin/env python
"""
Find datasets in the database that aren't purged and figure out why.
"""
from __future__ import print_function
import sys
from argparse import ArgumentParser
#from calendar import monthrange
from collections import namedtuple
from errno import EEXIST
from os import (
getcwd,
makedirs
)
from os.path import (
join
)
import psycopg2
try:
from galaxy.config import Configuration
from galaxy.util import nice_size
from galaxy.util.script import app_properties_from_args, populate_config_args
except ImportError:
Configuration = None
nice_size = None
app_properties_from_args = None
populate_config_args = None
KEY_TO_DESC = {
'active_library': 'active library dataset',
'inactive_library': 'inactive library dataset',
'non_library': 'dataset',
}
SUMMARY_DESC = (
('anon', 'of anonymous data'),
('owned', 'of data owned by active users'),
('deleted_user', 'of data owned by deleted users'),
('purged_user', 'of data owned by purged users'),
('ignored_user', 'of data owned by ignored users'),
('purged_hdas', 'of data with only purged HDA(s) (possibly recently purged)'),
('null_history_hda', 'of data with no associated history (orphaned data)'),
('unpurged_hda_in_purged_history', 'of unpurged HDAs in purged histories (overlaps with date data)'),
)
IGNORED_GROUP_SQL = """
SELECT
u.id AS u_id,
u.email AS u_email
FROM
galaxy_group g
JOIN
user_group_association uga on g.id = uga.group_id
JOIN
galaxy_user u on uga.user_id = u.id
WHERE
g.name = %s
ORDER BY
u.id
"""
LDDA_SQL = """
SELECT
d.id AS d_id,
ldda.deleted AS ldda_del
FROM
dataset d
JOIN
library_dataset_dataset_association ldda ON d.id = ldda.dataset_id
WHERE
NOT d.deleted
AND NOT d.purged
ORDER BY
d.id
"""
HDA_SQL = """
SELECT
d.id AS d_id,
d.total_size AS d_size,
d.deleted AS d_del,
d.purged AS d_pur,
hda.id AS hda_id,
hda.deleted AS hda_del,
hda.purged AS hda_pur,
h.id AS h_id,
h.deleted AS h_del,
h.purged AS h_pur,
u.id AS u_id,
u.email AS u_email,
u.deleted AS u_del,
u.purged AS u_pur,
u.update_time AS u_time
FROM
dataset d
LEFT OUTER JOIN
history_dataset_association hda ON d.id = hda.dataset_id
LEFT OUTER JOIN
history h ON hda.history_id = h.id
LEFT OUTER JOIN
galaxy_user u ON h.user_id = u.id
ORDER BY
d.id
"""
Dataset = namedtuple('Dataset', ['id', 'size'])
def parse_args(argv):
parser = ArgumentParser()
parser.add_argument('-g', '--galaxy', help='Path to Galaxy')
parser.add_argument('--ignore-users-file', help='Path to file containing list of users to ignore (one email per line)')
parser.add_argument('--ignore-group', action='append', default=[], help='Ignore any user that is a member of the named group (can be specified multiple times)')
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory')
parser.add_argument('-l', '--log-file', default=sys.stderr,
help='Path to log file (relative to OUTPUT_DIR if not absolute)')
if populate_config_args:
print('reparsing arguments with galaxy.util.script', file=sys.stderr)
populate_config_args(parser)
return parser.parse_args(argv)
else:
print('parsing arguments without galaxy.util.script', file=sys.stderr)
return parser.parse_known_args(argv)[0]
def import_galaxy(args):
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set"
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr)
sys.path.append(join(args.galaxy, 'lib'))
global Configuration
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0)
Configuration = _t.Configuration
global nice_size
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0)
nice_size = _t.nice_size
global app_properties_from_args
global populate_config_args
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0)
app_properties_from_args = _t.app_properties_from_args
populate_config_args = _t.populate_config_args
class Outputs(object):
def __init__(self, output_dir, log_file):
self.__output_dir = output_dir
self.__log_file = log_file
self.__files = {}
try:
makedirs(self.__output_dir)
except (OSError, IOError) as exc:
if exc.errno != EEXIST:
raise
try:
self.log('logging to: stderr')
except AttributeError:
print('logging to: {}'.format(self.__log_file), file=sys.stderr)
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w')
self.log('opened log: {}'.format(self.__log_file.name))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
for f in self.__files.values():
f.close()
def __getitem__(self, name):
if name not in self.__files:
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w')
return self.__files[name]
@property
def logging_to_terminal(self):
return self.__log_file == sys.stderr
def log(self, msg, end=None):
# maybe not the best to have log messages silently dropped but these are really only useful on the command line
if self.logging_to_terminal or end != '\r':
print(msg, file=self.__log_file, end=end)
class DatasetAnalyzer(object):
def __init__(self, outputs, conn_str, ignored_users_file=None, ignored_groups=None):
self.outputs = outputs
self.__conn_str = conn_str
self.__ignored_users_file = ignored_users_file
self.__ignored_groups = ignored_groups or []
self.__conn = None
self.__max_d_id = None
self.__log_base = None
self.library_active = None
self.ignored_users = None
self.cur_d = None
self.data_summary = {}
for key in KEY_TO_DESC.keys():
self.data_summary[key] = {
'anon': 0,
'owned': 0,
'deleted_user': 0,
'purged_user': 0,
'ignored_user': 0,
'purged_hdas': 0,
'null_history_hda': 0,
'unpurged_hda_in_purged_history': 0,
}
self.data_summary['active_unreferenced_library'] = 0
self.data_by_owner_date = {}
self.set_log_base()
self.set_ignored_users()
self.collect_library_data()
@property
def conn(self):
if self.__conn is None:
self.__conn = psycopg2.connect(self.__conn_str)
return self.__conn
def log_d(self, msg, end=None):
self.outputs.log(self.__log_base.format(self.cur_d.id, nice_size(self.cur_d.size), msg), end=end)
def write_d(self, log_file, owners=None):
msg = '{}\t{}'.format(self.cur_d.id, int(self.cur_d.size))
if owners is not None:
msg += '\t{}'.format(commafy(owners))
self.outputs[log_file].write(msg + '\n')
@property
def max_dataset_id(self):
if self.__max_d_id is None:
c = self.conn.cursor()
c.execute('SELECT max(id) FROM DATASET')
self.__max_d_id = c.fetchone()[0]
return self.__max_d_id
def set_log_base(self):
self.__log_base = '{:>%d}: {:32} {}' % (len(str(self.max_dataset_id)))
def set_ignored_users(self):
def _add_user():
self.outputs.log('{}: {}'.format(*row))
self.ignored_users.add(int(row[0]))
self.outputs.log('collecting ignored users...')
cur = self.conn.cursor()
self.ignored_users = set()
for name in self.__ignored_groups:
cur.execute(IGNORED_GROUP_SQL, (name,))
for row in cur:
_add_user()
if self.__ignored_users_file is not None:
with open(self.__ignored_users_file) as fh:
for line in fh:
email = line.strip()
row = cur.execute('SELECT id, email FROM galaxy_user WHERE email = ?', (email,)).fetchone()
_add_user()
self.outputs.log('done')
def collect_library_data(self):
self.outputs.log('collecting library datasets...', end='')
cur = self.conn.cursor()
cur.execute(LDDA_SQL)
self.library_active = {}
for row in cur:
d_id, ldda_del = row
# if one of this dataset's lddas was deleted but a later one was not, this dataset is active
self.library_active[d_id] = self.library_active.get(d_id) or not ldda_del
self.outputs.log('done')
def add_dataset_to_date_bucket(self, newest_owner_date):
if newest_owner_date not in self.data_by_owner_date:
self.data_by_owner_date[newest_owner_date] = 0
self.data_by_owner_date[newest_owner_date] += self.cur_d.size
def process_current_dataset(self, assocs):
anons = 0
owners = set()
ignored_owners = set()
purged_hdas = set()
null_history_hdas = set()
purged_owners = set()
deleted_owners = set()
unpurged_hdas_in_purged_histories = set()
newest_owner_date = None
for assoc in assocs:
hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time = assoc
if h_pur and not hda_pur:
unpurged_hdas_in_purged_histories.add(hda_id)
if hda_pur:
purged_hdas.add(hda_id)
# hda_del is irrelevant since it still counts against the user
elif u_id and u_pur:
purged_owners.add(u_email)
elif u_id and u_del:
deleted_owners.add(u_email)
elif u_id and u_id in self.ignored_users:
ignored_owners.add(u_email)
elif u_id:
#owners.add((u_id, u_email, u_time))
owners.add(u_email)
newest_owner_date = max(newest_owner_date or u_time.date(), u_time.date())
elif h_id is not None:
# TODO: could be the same one, we'd need session IDs to check. but we don't care
anons += 1
elif h_id is None:
null_history_hdas.add(hda_id)
else:
raise Exception('{}: broken HDA: {}'.format(self.cur_d.id, assoc))
# runs from the 1st of the month
if newest_owner_date and newest_owner_date.day != 1:
month = newest_owner_date.month % 12 + 1
year = newest_owner_date.year
if month == 1:
year += 1
newest_owner_date = newest_owner_date.replace(year=year, month=month, day=1)
# runs from the last day of the month
#if newest_owner_date:
# newest_owner_date = newest_owner_date.replace(day=monthrange(newest_owner_date.year, newest_owner_date.month)[1])
key = 'non_library'
if self.library_active.get(self.cur_d.id) is True:
key = 'active_library'
elif self.library_active.get(self.cur_d.id) is False:
key = 'inactive_library'
pre = '{} '.format(KEY_TO_DESC[key])
pre_file = pre.replace(' ', '_')
if owners and anons:
self.log_d(pre + 'referenced by {} anons and: {}'.format(anons, commafy(owners)))
self.write_d(pre_file + 'owned', owners=owners)
self.data_summary[key]['owned'] += self.cur_d.size
if key != 'active_library':
self.add_dataset_to_date_bucket(newest_owner_date)
elif owners:
self.log_d(pre + 'referenced by: {}'.format(commafy(owners)))
self.write_d(pre_file + 'owned', owners=owners)
self.data_summary[key]['owned'] += self.cur_d.size
if key != 'active_library':
self.add_dataset_to_date_bucket(newest_owner_date)
elif anons:
self.log_d(pre + 'referenced by {} anons'.format(anons))
self.write_d(pre_file + 'anon')
self.data_summary[key]['anon'] += self.cur_d.size
elif ignored_owners:
self.log_d(pre + 'referenced by ignored users: {}'.format(commafy(ignored_owners)))
self.write_d(pre_file + 'ignored_users')
self.data_summary[key]['ignored_user'] += self.cur_d.size
elif purged_owners or deleted_owners:
self.log_d(pre + 'referenced by {} deleted and/or {} purged users: {}'.format(
len(deleted_owners),
len(purged_owners),
deleted_owners.union(purged_owners),
))
elif unpurged_hdas_in_purged_histories:
self.log_d(pre + 'referenced by {} unpurged HDA(s) in purged histories')
#elif purged_hdas or null_history_hdas:
elif null_history_hdas:
#self.log_d(pre + 'referenced by {} purged HDA(s) and HDA(s) missing history_id: {}'.format(
self.log_d(pre + 'referenced by HDA(s) missing history_id: {}'.format(
# len(purged_hdas),
commafy(null_history_hdas)
))
elif key == 'active_library':
pre = 'active unreferenced library dataset'
self.log_d(pre)
self.write_d(pre.replace(' ', '_'))
self.data_summary['active_unreferenced_library'] += self.cur_d.size
elif purged_hdas:
self.log_d(pre + 'referenced only by purged HDA(s) (recently purged?): {}'.format(commafy(purged_hdas)))
self.write_d(pre_file + 'purged_hdas')
self.data_summary[key]['purged_hdas'] += self.cur_d.size
else:
raise Exception('{}: no explanation!'.format(self.cur_d.id))
# write these out regardless
if null_history_hdas:
self.write_d(pre_file + 'null_history_hdas', owners=null_history_hdas)
self.data_summary[key]['null_history_hda'] += self.cur_d.size
if deleted_owners:
self.write_d(pre_file + 'deleted_owners', owners=deleted_owners)
self.data_summary[key]['deleted_user'] += self.cur_d.size
if purged_owners:
self.write_d(pre_file + 'purged_owners', owners=deleted_owners)
self.data_summary[key]['purged_user'] += self.cur_d.size
if unpurged_hdas_in_purged_histories:
self.write_d(pre_file + 'unpurged_hdas_in_purged_histories', owners=unpurged_hdas_in_purged_histories)
self.data_summary[key]['unpurged_hda_in_purged_history'] += self.cur_d.size # overlap
def walk_datasets(self):
self.outputs.log('walking datasets...')
purged_data = 0
deleted_not_purged_data = 0
cur = self.conn.cursor('server_side')
cur.execute(HDA_SQL)
skipping = False
assocs = []
for row in cur:
d_id, d_size, d_del, d_pur, hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time = row
if self.cur_d is None:
self.cur_d = Dataset(d_id, d_size)
if d_id == self.cur_d.id and skipping:
continue
elif d_id != self.cur_d.id:
if not skipping:
self.process_current_dataset(assocs)
self.cur_d = Dataset(d_id, d_size)
skipping = False
assocs = []
if not self.outputs.logging_to_terminal:
print(self.max_dataset_id - d_id, end='\r', file=sys.stderr)
if d_size is None or d_size < 0:
self.outputs.log('WARNING: skipping dataset with invalid size: {}: {}'.format(d_id, d_size))
skipping = True
continue
if d_pur:
self.log_d('purged', end='\r')
purged_data += d_size
skipping = True
elif d_del:
self.log_d('deleted but not purged')
deleted_not_purged_data += d_size
skipping = True
if not skipping:
assocs.append((hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time))
def print_and_log(self, msg, log_file):
print(msg, file=sys.stderr)
self.outputs[log_file].write(msg + '\n')
def summarize(self):
_base = '{:>%d} ({:>%d}) {}'
lens = []
for key in KEY_TO_DESC.keys():
lens.append(max_lengths(self.data_summary[key].values(), str, nice_size))
base = _base % tuple(map(max, zip(*lens)))
self.print_and_log(underline('data summary', c='='), 'summary_human')
for key in sorted(KEY_TO_DESC.keys()):
self.print_and_log(underline(KEY_TO_DESC[key]), 'summary_human')
output = 'summary_' + key
for k, txt in SUMMARY_DESC:
v = self.data_summary[key][k]
self.print_and_log(base.format(v, nice_size(v), txt), 'summary_human')
self.outputs[output].write('{}\t{}\n'.format(v, k))
self.print_and_log(underline('misc data'), 'summary_human')
v = self.data_summary['active_unreferenced_library']
base = _base % (len(str(v)), len(nice_size(v)))
self.print_and_log(base.format(v, nice_size(v), 'of active, unreferenced library data'), 'summary_human')
self.print_and_log(underline('data by last user login', c='='), 'summary_human')
self.print_and_log(underline('per bucket'), 'summary_human')
base = _base % max_lengths(self.data_by_owner_date.values(), str, nice_size)
for date in sorted(self.data_by_owner_date.keys()):
data = self.data_by_owner_date[date]
self.print_and_log(base.format(data, nice_size(data), 'owned by users that have not logged in since {}'.format(date)), 'summary_human')
self.outputs['summary_by_date_bucket'].write('{}\t{}\n'.format(data, date))
total = 0
totals = []
for date in sorted(self.data_by_owner_date.keys()):
data = self.data_by_owner_date[date]
total += data
totals.append(total)
self.print_and_log(underline('cumulative'), 'summary_human')
base = _base % max_lengths(totals, str, nice_size)
for date, total in zip(sorted(self.data_by_owner_date.keys()), totals):
self.print_and_log(base.format(total, nice_size(total), 'owned by users that have not logged in since {}'.format(date)), 'summary_human')
self.outputs['summary_by_date_cumulative'].write('{}\t{}\n'.format(total, date))
def max_lengths(iterable, *map_funcs):
lens = [0] * len(map_funcs)
for o in iterable:
for i, f in enumerate(map_funcs):
lens[i] = max(len(f(o)), lens[i])
return tuple(lens)
def underline(s, c='-'):
return '\n'.join([s, c * len(s)])
def commafy(iterable):
return ', '.join([str(i) for i in iterable])
def main(argv):
args = parse_args(argv[1:])
if nice_size is None:
import_galaxy(args)
args = parse_args(argv[1:])
properties = app_properties_from_args(args)
config = Configuration(**properties)
with Outputs(args.output_dir, args.log_file) as outputs:
analyzer = DatasetAnalyzer(
outputs,
config.database_connection,
ignored_users_file=args.ignore_users_file,
ignored_groups=args.ignore_group,
)
analyzer.walk_datasets()
analyzer.summarize()
if __name__ == '__main__':
main(sys.argv)
#!/usr/bin/env python
"""
Finds datasets on disk that shouldn't be there.
This script is not object-store aware. You just point it at directories that are DiskObjectStore backends (e.g.
file_path in the Galaxy config by default).
"""
from __future__ import absolute_import, print_function
import sys
from argparse import ArgumentParser
from errno import EEXIST
from os import (
getcwd,
makedirs,
readlink,
walk
)
from os.path import (
exists,
getsize,
join,
islink
)
try:
from galaxy.config import Configuration
from galaxy.model import mapping
from galaxy.util import nice_size
from galaxy.util.script import app_properties_from_args, populate_config_args
except ImportError:
Configuration = None
mapping = None
nice_size = None
app_properties_from_args = None
populate_config_args = None
def parse_args(argv):
parser = ArgumentParser()
parser.add_argument('-g', '--galaxy', help='Path to Galaxy')
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory')
parser.add_argument('-l', '--log-file', default=sys.stderr,
help='Path to log file (relative to OUTPUT_DIR if not absolute)')
if populate_config_args:
print('reparsing arguments with galaxy.util.script', file=sys.stderr)
populate_config_args(parser)
parser.add_argument('dirs', nargs='+', help='Dir(s) to search')
return parser.parse_args(argv)
else:
print('parsing arguments without galaxy.util.script', file=sys.stderr)
parser.add_argument('dirs', nargs='+', help='Dir(s) to search')
return parser.parse_known_args(argv)[0]
def import_galaxy(args):
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set"
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr)
sys.path.append(join(args.galaxy, 'lib'))
global Configuration
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0)
Configuration = _t.Configuration
global mapping
_t = __import__('galaxy.model', globals(), locals(), ['mapping'], 0)
mapping = _t.mapping
global nice_size
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0)
nice_size = _t.nice_size
global app_properties_from_args
global populate_config_args
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0)
app_properties_from_args = _t.app_properties_from_args
populate_config_args = _t.populate_config_args
return mapping
def get_model(config):
print('initializing model', file=sys.stderr)
return mapping.init(config.file_path, config.database_connection, create_tables=False)
class Total(object):
def __init__(self, total=0):
self.__total = total
def __nonzero__(self):
return self.__bool__()
def __bool__(self):
return bool(self.__total)
def __iadd__(self, other):
self.__total += int(other)
return self
def __add__(self, other):
return self.__total + int(other)
def __int__(self):
return self.__total
def __str__(self):
return str(self.__total)
@property
def human(self):
return nice_size(self.__total)
class Outputs(object):
def __init__(self, output_dir, log_file):
self.__output_dir = output_dir
self.__log_file = log_file
self.__files = {}
try:
makedirs(self.__output_dir)
except (OSError, IOError) as exc:
if exc.errno != EEXIST:
raise
try:
self.log('logging to: stderr')
except AttributeError:
print('logging to: {}'.format(self.__log_file), file=sys.stderr)
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w')
self.log('opened log: {}'.format(self.__log_file.name))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
for f in self.__files.values():
f.close()
def __getitem__(self, name):
if name not in self.__files:
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w')
return self.__files[name]
def log(self, msg):
print(msg, file=self.__log_file)
def log_oserror(self, exc):
self.log('WARNING: {}: [Errno {}] {}: {}'.format(exc.__class__.__name__, exc.errno, exc.strerror, exc.filename))
class BadFinder(object):
def __init__(self, outputs, cumulative, model, model_object_name='Dataset'):
self.outputs = outputs
self.cumulative = cumulative
self.model = model
self.context = model.context.current
self.model_object_name = model_object_name
if model_object_name == 'Dataset':
self.model_object = model.Dataset
self.prefix = 'dataset_'
elif model_object_name == 'MetadataFile':
self.model_object = model.MetadataFile
self.prefix = 'metadata_'
else:
raise Exception('Unknown model object: {}'.format(model_object_name))
def log_summary(self, total, msg, c='-'):
if total:
sep = c * 2
self.outputs.log('{} {}: {} ({}) {} {}'.format(sep, self.cumulative.human, total.human, total, msg, sep))
def sum_dir(self, root):
total = Total()
for dirpath, dirnames, filenames in walk(root):
for filename, path in iter_contents(dirpath, filenames):
try:
size = size_sum(getsize(path), total)
self.outputs.log(
'{}: {} ({}) member {}'.format(
nice_size(self.cumulative + total), nice_size(size), size, path
)
)
except (IOError, OSError) as exc:
self.outputs.log_oserror(exc)
return total
def check_link_dirs(self, dirs):
links = []
for dirname, path in dirs:
if islink(path):
self.outputs.log('BAD: dir is link!: {} -> {}'.format(path, readlink(path)))
self.outputs['link_dirs'].write('{}\n'.format(path))
links.append(dirname)
return links
def check_efp_dirs(self, dirs):
total = Total()
for dirname, path in dirs:
# find all efp dirs, sum all should-be-purged
dataset_id = get_dataset_id(dirname, self.prefix, '_files')
d = self.context.query(self.model_object).get(dataset_id)
if d.purged:
size = size_sum(self.sum_dir(path), total, self.cumulative)
self.outputs['unpurged_efp_dirs'].write('{} {}\n'.format(size, path))
self.log_summary(Total(size), 'contents of unpurged efp dir {}/'.format(path))
return total
def check_invalid_dirs(self, dirs):
total = Total()
for dirname, path in dirs:
size = size_sum(self.sum_dir(path), total, self.cumulative)
self.outputs['invalid_dirs'].write('{} {}\n'.format(size, path))
self.log_summary(Total(size), 'contents of invalid dir {}/'.format(path))
return total
def bad_file(self, path, output='invalid_files'):
total = Total()
try:
size = size_sum(getsize(path), total, self.cumulative)
self.outputs.log('{}: {} ({}) {} {}'.format(self.cumulative.human, nice_size(size), size, output, path))
self.outputs[output].write('{} {}\n'.format(size, path))
except (IOError, OSError) as exc:
self.outputs.log_oserror(exc)
return total
def check_files(self, files):
total = Total()
badfiles = []
for filename, path in files:
if islink(path):
self.outputs.log('BAD: file is link!: {} -> {}'.format(path, readlink(path)))
self.outputs['link_files'].write('{}\n'.format(path))
continue
if filename.startswith(self.prefix) and filename.endswith('.dat'):
try:
dataset_id = get_dataset_id(filename, self.prefix, '.dat')
except ValueError:
self.outputs.log('WARNING: file has dataset-like filename but is invalid: {}'.format(path))
total += self.bad_file(path)
continue
d = self.context.query(self.model_object).get(dataset_id)
if d.purged:
total += self.bad_file(path, 'unpurged_files')
else:
total += self.bad_file(path)
return total
def walk_path(self, path):
cumulative = Total()
self.outputs.log('walking: {}'.format(path))
for dirpath, dirnames, filenames in walk(path):
self.outputs.log('walked to: {}'.format(dirpath))
total = Total()
rel = dirpath[len(path.rstrip('/')) + 1:]
# we should not be walking in any subdirs of hash dirs in the outer walk
if not dirpath == path and not is_hash_dir(rel):
self.outputs.log('WARNING: invalid dir (how did we get here?): {}'.format(dirpath))
self.outputs.log('WARNING: ignoring subdirs: {}'.format(dirnames))
del dirnames[:]
continue
# walk _metadata_files (at the root - otherwise it's invalid) later if it exists
if dirpath == path:
try:
dirnames.remove('_metadata_files')
except ValueError:
pass
# remove symlink dirs before dividing
links = self.check_link_dirs(iter_contents(dirpath, dirnames))
for link in links:
dirnames.remove(link)
# classify subdirs
hash_subdirs, efp_subdirs, invalid_subdirs = divide_subdirs(dirnames, self.prefix)
# remove all non-hash dirs from dirnames
del dirnames[:]
dirnames.extend(hash_subdirs)
# find bad stuff
efp_total = Total()
dirs_total = Total()
files_total = Total()
size_sum(self.check_efp_dirs(iter_contents(dirpath, efp_subdirs)), efp_total, total, cumulative)
self.log_summary(
efp_total, 'contents of all unpurged {} efp dirs in {}/'.format(self.model_object_name, dirpath), c="="
)
size_sum(self.check_invalid_dirs(iter_contents(dirpath, invalid_subdirs)), dirs_total, total, cumulative)
self.log_summary(dirs_total, 'contents of all invalid directories in {}/'.format(dirpath), c="=")
size_sum(self.check_files(iter_contents(dirpath, filenames)), files_total, total, cumulative)
self.log_summary(files_total, 'all unpurged/invalid files in {}/'.format(dirpath), c="=")
self.log_summary(total, 'all direct (non-recursive) bad contents of {}/'.format(dirpath), c="#")
mfpath = join(path, '_metadata_files')
if exists(mfpath):
finder = BadFinder(self.outputs, self.cumulative, self.model, model_object_name='MetadataFile')
finder.walk_path(mfpath)
self.log_summary(cumulative, 'total bad contents of {}/'.format(path), c="^")
return cumulative
def size_sum(s, *totals):
"""totals must be mutable (e.g. an instance of Total)"""
for total in totals:
total += s
return s
def iter_contents(dirpath, contents):
for content in contents:
yield (content, join(dirpath, content))
def is_hash_dir(dirpath):
try:
for elem in dirpath.split('/'):
assert int(elem) < 1000
return True
except (AssertionError, ValueError):
pass
return False
def get_dataset_id(name, prefix, suffix):
return int(name[len(prefix):][:-len(suffix)])
def divide_subdirs(dirnames, prefix):
# should only be called if at the root of a valid hash dir
hashdirs = []
efps = []
bads = []
for dirname in dirnames:
try:
# hash subdir
assert int(dirname) < 1000
hashdirs.append(dirname)
continue
except (AssertionError, ValueError):
pass
try:
assert dirname.startswith(prefix) and dirname.endswith('_files')
get_dataset_id(dirname, prefix, '_files')
efps.append(dirname)
continue
except (AssertionError, ValueError):
pass
bads.append(dirname)
return (sorted(hashdirs), sorted(efps), sorted(bads))
def walk_all(dirs, model, outputs):
total = Total()
for path in dirs:
outputs.log('finding files in: {}'.format(path))
finder = BadFinder(outputs, total, model)
finder.walk_path(path)
outputs.log('** {} ({}) total bad contents **'.format(total.human, total))
def main(argv):
args = parse_args(argv[1:])
if mapping is None:
import_galaxy(args)
args = parse_args(argv[1:])
properties = app_properties_from_args(args)
config = Configuration(**properties)
model = get_model(config)
with Outputs(args.output_dir, args.log_file) as outputs:
walk_all(args.dirs, model, outputs)
if __name__ == '__main__':
main(sys.argv)
#!/usr/bin/env python
""" Relocate files prior to removing them.
Input is from findbad.py (the output files for invalid and unpurged items, not links).
"""
from __future__ import print_function
import sys
from errno import EEXIST
from os import rename, makedirs
from os.path import dirname, join, commonprefix, isdir, exists
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('-o', '--output-dir', default='/galaxy-repl/main/rmfiles', help='Path to output directory')
parser.add_argument('--dry-run', action='store_true', default=False, help='Show what would be done without doing it')
parser.add_argument('--test', type=int, default=-1, help='Exit after N moves')
parser.add_argument('files', nargs='+', help='File(s) to read')
args = parser.parse_args()
# for dry runs
dirs_made = set()
def make_dstdir(d):
if not args.dry_run:
try:
makedirs(d)
print('mkdir %s' % d)
except (IOError, OSError) as exc:
if exc.errno != EEXIST:
raise
elif d not in dirs_made and not isdir(d):
print('mkdir %s' % d)
dirs_made.add(d)
def move(src, dst):
if not args.dry_run:
try:
rename(src, dst)
print('%s -> %s' % (src, dst))
return True
except (IOError, OSError) as exc:
print('%s: [Errno %d] %s: %s' % (exc.__class__.__name__, exc.errno, exc.strerror, src))
elif exists(src):
print('%s -> %s' % (src, dst))
return True
else:
print('ENOENT %s' % src)
return False
def process_file(fname):
moves_made = 0
for line in open(fname):
if args.test >= 0 and moves_made >= args.test:
print('test mode: stopping after %d moves' % moves_made)
break
parts = line.strip().split()
src = parts[1]
rel = src[len(commonprefix([args.output_dir, src])):]
dst = join(args.output_dir, rel)
make_dstdir(dirname(dst))
if move(src, dst):
moves_made += 1
print('completed %d moves' % moves_made)
if __name__ == '__main__':
for f in args.files:
process_file(f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment