Last active
October 1, 2018 16:16
-
-
Save natefoo/b98a5f08cd62a6750cd1e338076e2cda to your computer and use it in GitHub Desktop.
Find stuff in Galaxy's file_path that shouldn't be there
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Find datasets in the database that aren't purged and figure out why. | |
""" | |
from __future__ import print_function | |
import sys | |
from argparse import ArgumentParser | |
#from calendar import monthrange | |
from collections import namedtuple | |
from errno import EEXIST | |
from os import ( | |
getcwd, | |
makedirs | |
) | |
from os.path import ( | |
join | |
) | |
import psycopg2 | |
try: | |
from galaxy.config import Configuration | |
from galaxy.util import nice_size | |
from galaxy.util.script import app_properties_from_args, populate_config_args | |
except ImportError: | |
Configuration = None | |
nice_size = None | |
app_properties_from_args = None | |
populate_config_args = None | |
KEY_TO_DESC = { | |
'active_library': 'active library dataset', | |
'inactive_library': 'inactive library dataset', | |
'non_library': 'dataset', | |
} | |
SUMMARY_DESC = ( | |
('anon', 'of anonymous data'), | |
('owned', 'of data owned by active users'), | |
('deleted_user', 'of data owned by deleted users'), | |
('purged_user', 'of data owned by purged users'), | |
('ignored_user', 'of data owned by ignored users'), | |
('purged_hdas', 'of data with only purged HDA(s) (possibly recently purged)'), | |
('null_history_hda', 'of data with no associated history (orphaned data)'), | |
('unpurged_hda_in_purged_history', 'of unpurged HDAs in purged histories (overlaps with date data)'), | |
) | |
IGNORED_GROUP_SQL = """ | |
SELECT | |
u.id AS u_id, | |
u.email AS u_email | |
FROM | |
galaxy_group g | |
JOIN | |
user_group_association uga on g.id = uga.group_id | |
JOIN | |
galaxy_user u on uga.user_id = u.id | |
WHERE | |
g.name = %s | |
ORDER BY | |
u.id | |
""" | |
LDDA_SQL = """ | |
SELECT | |
d.id AS d_id, | |
ldda.deleted AS ldda_del | |
FROM | |
dataset d | |
JOIN | |
library_dataset_dataset_association ldda ON d.id = ldda.dataset_id | |
WHERE | |
NOT d.deleted | |
AND NOT d.purged | |
ORDER BY | |
d.id | |
""" | |
HDA_SQL = """ | |
SELECT | |
d.id AS d_id, | |
d.total_size AS d_size, | |
d.deleted AS d_del, | |
d.purged AS d_pur, | |
hda.id AS hda_id, | |
hda.deleted AS hda_del, | |
hda.purged AS hda_pur, | |
h.id AS h_id, | |
h.deleted AS h_del, | |
h.purged AS h_pur, | |
u.id AS u_id, | |
u.email AS u_email, | |
u.deleted AS u_del, | |
u.purged AS u_pur, | |
u.update_time AS u_time | |
FROM | |
dataset d | |
LEFT OUTER JOIN | |
history_dataset_association hda ON d.id = hda.dataset_id | |
LEFT OUTER JOIN | |
history h ON hda.history_id = h.id | |
LEFT OUTER JOIN | |
galaxy_user u ON h.user_id = u.id | |
ORDER BY | |
d.id | |
""" | |
Dataset = namedtuple('Dataset', ['id', 'size']) | |
def parse_args(argv): | |
parser = ArgumentParser() | |
parser.add_argument('-g', '--galaxy', help='Path to Galaxy') | |
parser.add_argument('--ignore-users-file', help='Path to file containing list of users to ignore (one email per line)') | |
parser.add_argument('--ignore-group', action='append', default=[], help='Ignore any user that is a member of the named group (can be specified multiple times)') | |
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory') | |
parser.add_argument('-l', '--log-file', default=sys.stderr, | |
help='Path to log file (relative to OUTPUT_DIR if not absolute)') | |
if populate_config_args: | |
print('reparsing arguments with galaxy.util.script', file=sys.stderr) | |
populate_config_args(parser) | |
return parser.parse_args(argv) | |
else: | |
print('parsing arguments without galaxy.util.script', file=sys.stderr) | |
return parser.parse_known_args(argv)[0] | |
def import_galaxy(args): | |
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set" | |
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr) | |
sys.path.append(join(args.galaxy, 'lib')) | |
global Configuration | |
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0) | |
Configuration = _t.Configuration | |
global nice_size | |
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0) | |
nice_size = _t.nice_size | |
global app_properties_from_args | |
global populate_config_args | |
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0) | |
app_properties_from_args = _t.app_properties_from_args | |
populate_config_args = _t.populate_config_args | |
class Outputs(object): | |
def __init__(self, output_dir, log_file): | |
self.__output_dir = output_dir | |
self.__log_file = log_file | |
self.__files = {} | |
try: | |
makedirs(self.__output_dir) | |
except (OSError, IOError) as exc: | |
if exc.errno != EEXIST: | |
raise | |
try: | |
self.log('logging to: stderr') | |
except AttributeError: | |
print('logging to: {}'.format(self.__log_file), file=sys.stderr) | |
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w') | |
self.log('opened log: {}'.format(self.__log_file.name)) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
for f in self.__files.values(): | |
f.close() | |
def __getitem__(self, name): | |
if name not in self.__files: | |
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w') | |
return self.__files[name] | |
@property | |
def logging_to_terminal(self): | |
return self.__log_file == sys.stderr | |
def log(self, msg, end=None): | |
# maybe not the best to have log messages silently dropped but these are really only useful on the command line | |
if self.logging_to_terminal or end != '\r': | |
print(msg, file=self.__log_file, end=end) | |
class DatasetAnalyzer(object): | |
def __init__(self, outputs, conn_str, ignored_users_file=None, ignored_groups=None): | |
self.outputs = outputs | |
self.__conn_str = conn_str | |
self.__ignored_users_file = ignored_users_file | |
self.__ignored_groups = ignored_groups or [] | |
self.__conn = None | |
self.__max_d_id = None | |
self.__log_base = None | |
self.library_active = None | |
self.ignored_users = None | |
self.cur_d = None | |
self.data_summary = {} | |
for key in KEY_TO_DESC.keys(): | |
self.data_summary[key] = { | |
'anon': 0, | |
'owned': 0, | |
'deleted_user': 0, | |
'purged_user': 0, | |
'ignored_user': 0, | |
'purged_hdas': 0, | |
'null_history_hda': 0, | |
'unpurged_hda_in_purged_history': 0, | |
} | |
self.data_summary['active_unreferenced_library'] = 0 | |
self.data_by_owner_date = {} | |
self.set_log_base() | |
self.set_ignored_users() | |
self.collect_library_data() | |
@property | |
def conn(self): | |
if self.__conn is None: | |
self.__conn = psycopg2.connect(self.__conn_str) | |
return self.__conn | |
def log_d(self, msg, end=None): | |
self.outputs.log(self.__log_base.format(self.cur_d.id, nice_size(self.cur_d.size), msg), end=end) | |
def write_d(self, log_file, owners=None): | |
msg = '{}\t{}'.format(self.cur_d.id, int(self.cur_d.size)) | |
if owners is not None: | |
msg += '\t{}'.format(commafy(owners)) | |
self.outputs[log_file].write(msg + '\n') | |
@property | |
def max_dataset_id(self): | |
if self.__max_d_id is None: | |
c = self.conn.cursor() | |
c.execute('SELECT max(id) FROM DATASET') | |
self.__max_d_id = c.fetchone()[0] | |
return self.__max_d_id | |
def set_log_base(self): | |
self.__log_base = '{:>%d}: {:32} {}' % (len(str(self.max_dataset_id))) | |
def set_ignored_users(self): | |
def _add_user(): | |
self.outputs.log('{}: {}'.format(*row)) | |
self.ignored_users.add(int(row[0])) | |
self.outputs.log('collecting ignored users...') | |
cur = self.conn.cursor() | |
self.ignored_users = set() | |
for name in self.__ignored_groups: | |
cur.execute(IGNORED_GROUP_SQL, (name,)) | |
for row in cur: | |
_add_user() | |
if self.__ignored_users_file is not None: | |
with open(self.__ignored_users_file) as fh: | |
for line in fh: | |
email = line.strip() | |
row = cur.execute('SELECT id, email FROM galaxy_user WHERE email = ?', (email,)).fetchone() | |
_add_user() | |
self.outputs.log('done') | |
def collect_library_data(self): | |
self.outputs.log('collecting library datasets...', end='') | |
cur = self.conn.cursor() | |
cur.execute(LDDA_SQL) | |
self.library_active = {} | |
for row in cur: | |
d_id, ldda_del = row | |
# if one of this dataset's lddas was deleted but a later one was not, this dataset is active | |
self.library_active[d_id] = self.library_active.get(d_id) or not ldda_del | |
self.outputs.log('done') | |
def add_dataset_to_date_bucket(self, newest_owner_date): | |
if newest_owner_date not in self.data_by_owner_date: | |
self.data_by_owner_date[newest_owner_date] = 0 | |
self.data_by_owner_date[newest_owner_date] += self.cur_d.size | |
def process_current_dataset(self, assocs): | |
anons = 0 | |
owners = set() | |
ignored_owners = set() | |
purged_hdas = set() | |
null_history_hdas = set() | |
purged_owners = set() | |
deleted_owners = set() | |
unpurged_hdas_in_purged_histories = set() | |
newest_owner_date = None | |
for assoc in assocs: | |
hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time = assoc | |
if h_pur and not hda_pur: | |
unpurged_hdas_in_purged_histories.add(hda_id) | |
if hda_pur: | |
purged_hdas.add(hda_id) | |
# hda_del is irrelevant since it still counts against the user | |
elif u_id and u_pur: | |
purged_owners.add(u_email) | |
elif u_id and u_del: | |
deleted_owners.add(u_email) | |
elif u_id and u_id in self.ignored_users: | |
ignored_owners.add(u_email) | |
elif u_id: | |
#owners.add((u_id, u_email, u_time)) | |
owners.add(u_email) | |
newest_owner_date = max(newest_owner_date or u_time.date(), u_time.date()) | |
elif h_id is not None: | |
# TODO: could be the same one, we'd need session IDs to check. but we don't care | |
anons += 1 | |
elif h_id is None: | |
null_history_hdas.add(hda_id) | |
else: | |
raise Exception('{}: broken HDA: {}'.format(self.cur_d.id, assoc)) | |
# runs from the 1st of the month | |
if newest_owner_date and newest_owner_date.day != 1: | |
month = newest_owner_date.month % 12 + 1 | |
year = newest_owner_date.year | |
if month == 1: | |
year += 1 | |
newest_owner_date = newest_owner_date.replace(year=year, month=month, day=1) | |
# runs from the last day of the month | |
#if newest_owner_date: | |
# newest_owner_date = newest_owner_date.replace(day=monthrange(newest_owner_date.year, newest_owner_date.month)[1]) | |
key = 'non_library' | |
if self.library_active.get(self.cur_d.id) is True: | |
key = 'active_library' | |
elif self.library_active.get(self.cur_d.id) is False: | |
key = 'inactive_library' | |
pre = '{} '.format(KEY_TO_DESC[key]) | |
pre_file = pre.replace(' ', '_') | |
if owners and anons: | |
self.log_d(pre + 'referenced by {} anons and: {}'.format(anons, commafy(owners))) | |
self.write_d(pre_file + 'owned', owners=owners) | |
self.data_summary[key]['owned'] += self.cur_d.size | |
if key != 'active_library': | |
self.add_dataset_to_date_bucket(newest_owner_date) | |
elif owners: | |
self.log_d(pre + 'referenced by: {}'.format(commafy(owners))) | |
self.write_d(pre_file + 'owned', owners=owners) | |
self.data_summary[key]['owned'] += self.cur_d.size | |
if key != 'active_library': | |
self.add_dataset_to_date_bucket(newest_owner_date) | |
elif anons: | |
self.log_d(pre + 'referenced by {} anons'.format(anons)) | |
self.write_d(pre_file + 'anon') | |
self.data_summary[key]['anon'] += self.cur_d.size | |
elif ignored_owners: | |
self.log_d(pre + 'referenced by ignored users: {}'.format(commafy(ignored_owners))) | |
self.write_d(pre_file + 'ignored_users') | |
self.data_summary[key]['ignored_user'] += self.cur_d.size | |
elif purged_owners or deleted_owners: | |
self.log_d(pre + 'referenced by {} deleted and/or {} purged users: {}'.format( | |
len(deleted_owners), | |
len(purged_owners), | |
deleted_owners.union(purged_owners), | |
)) | |
elif unpurged_hdas_in_purged_histories: | |
self.log_d(pre + 'referenced by {} unpurged HDA(s) in purged histories') | |
#elif purged_hdas or null_history_hdas: | |
elif null_history_hdas: | |
#self.log_d(pre + 'referenced by {} purged HDA(s) and HDA(s) missing history_id: {}'.format( | |
self.log_d(pre + 'referenced by HDA(s) missing history_id: {}'.format( | |
# len(purged_hdas), | |
commafy(null_history_hdas) | |
)) | |
elif key == 'active_library': | |
pre = 'active unreferenced library dataset' | |
self.log_d(pre) | |
self.write_d(pre.replace(' ', '_')) | |
self.data_summary['active_unreferenced_library'] += self.cur_d.size | |
elif purged_hdas: | |
self.log_d(pre + 'referenced only by purged HDA(s) (recently purged?): {}'.format(commafy(purged_hdas))) | |
self.write_d(pre_file + 'purged_hdas') | |
self.data_summary[key]['purged_hdas'] += self.cur_d.size | |
else: | |
raise Exception('{}: no explanation!'.format(self.cur_d.id)) | |
# write these out regardless | |
if null_history_hdas: | |
self.write_d(pre_file + 'null_history_hdas', owners=null_history_hdas) | |
self.data_summary[key]['null_history_hda'] += self.cur_d.size | |
if deleted_owners: | |
self.write_d(pre_file + 'deleted_owners', owners=deleted_owners) | |
self.data_summary[key]['deleted_user'] += self.cur_d.size | |
if purged_owners: | |
self.write_d(pre_file + 'purged_owners', owners=deleted_owners) | |
self.data_summary[key]['purged_user'] += self.cur_d.size | |
if unpurged_hdas_in_purged_histories: | |
self.write_d(pre_file + 'unpurged_hdas_in_purged_histories', owners=unpurged_hdas_in_purged_histories) | |
self.data_summary[key]['unpurged_hda_in_purged_history'] += self.cur_d.size # overlap | |
def walk_datasets(self): | |
self.outputs.log('walking datasets...') | |
purged_data = 0 | |
deleted_not_purged_data = 0 | |
cur = self.conn.cursor('server_side') | |
cur.execute(HDA_SQL) | |
skipping = False | |
assocs = [] | |
for row in cur: | |
d_id, d_size, d_del, d_pur, hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time = row | |
if self.cur_d is None: | |
self.cur_d = Dataset(d_id, d_size) | |
if d_id == self.cur_d.id and skipping: | |
continue | |
elif d_id != self.cur_d.id: | |
if not skipping: | |
self.process_current_dataset(assocs) | |
self.cur_d = Dataset(d_id, d_size) | |
skipping = False | |
assocs = [] | |
if not self.outputs.logging_to_terminal: | |
print(self.max_dataset_id - d_id, end='\r', file=sys.stderr) | |
if d_size is None or d_size < 0: | |
self.outputs.log('WARNING: skipping dataset with invalid size: {}: {}'.format(d_id, d_size)) | |
skipping = True | |
continue | |
if d_pur: | |
self.log_d('purged', end='\r') | |
purged_data += d_size | |
skipping = True | |
elif d_del: | |
self.log_d('deleted but not purged') | |
deleted_not_purged_data += d_size | |
skipping = True | |
if not skipping: | |
assocs.append((hda_id, hda_del, hda_pur, h_id, h_del, h_pur, u_id, u_email, u_del, u_pur, u_time)) | |
def print_and_log(self, msg, log_file): | |
print(msg, file=sys.stderr) | |
self.outputs[log_file].write(msg + '\n') | |
def summarize(self): | |
_base = '{:>%d} ({:>%d}) {}' | |
lens = [] | |
for key in KEY_TO_DESC.keys(): | |
lens.append(max_lengths(self.data_summary[key].values(), str, nice_size)) | |
base = _base % tuple(map(max, zip(*lens))) | |
self.print_and_log(underline('data summary', c='='), 'summary_human') | |
for key in sorted(KEY_TO_DESC.keys()): | |
self.print_and_log(underline(KEY_TO_DESC[key]), 'summary_human') | |
output = 'summary_' + key | |
for k, txt in SUMMARY_DESC: | |
v = self.data_summary[key][k] | |
self.print_and_log(base.format(v, nice_size(v), txt), 'summary_human') | |
self.outputs[output].write('{}\t{}\n'.format(v, k)) | |
self.print_and_log(underline('misc data'), 'summary_human') | |
v = self.data_summary['active_unreferenced_library'] | |
base = _base % (len(str(v)), len(nice_size(v))) | |
self.print_and_log(base.format(v, nice_size(v), 'of active, unreferenced library data'), 'summary_human') | |
self.print_and_log(underline('data by last user login', c='='), 'summary_human') | |
self.print_and_log(underline('per bucket'), 'summary_human') | |
base = _base % max_lengths(self.data_by_owner_date.values(), str, nice_size) | |
for date in sorted(self.data_by_owner_date.keys()): | |
data = self.data_by_owner_date[date] | |
self.print_and_log(base.format(data, nice_size(data), 'owned by users that have not logged in since {}'.format(date)), 'summary_human') | |
self.outputs['summary_by_date_bucket'].write('{}\t{}\n'.format(data, date)) | |
total = 0 | |
totals = [] | |
for date in sorted(self.data_by_owner_date.keys()): | |
data = self.data_by_owner_date[date] | |
total += data | |
totals.append(total) | |
self.print_and_log(underline('cumulative'), 'summary_human') | |
base = _base % max_lengths(totals, str, nice_size) | |
for date, total in zip(sorted(self.data_by_owner_date.keys()), totals): | |
self.print_and_log(base.format(total, nice_size(total), 'owned by users that have not logged in since {}'.format(date)), 'summary_human') | |
self.outputs['summary_by_date_cumulative'].write('{}\t{}\n'.format(total, date)) | |
def max_lengths(iterable, *map_funcs): | |
lens = [0] * len(map_funcs) | |
for o in iterable: | |
for i, f in enumerate(map_funcs): | |
lens[i] = max(len(f(o)), lens[i]) | |
return tuple(lens) | |
def underline(s, c='-'): | |
return '\n'.join([s, c * len(s)]) | |
def commafy(iterable): | |
return ', '.join([str(i) for i in iterable]) | |
def main(argv): | |
args = parse_args(argv[1:]) | |
if nice_size is None: | |
import_galaxy(args) | |
args = parse_args(argv[1:]) | |
properties = app_properties_from_args(args) | |
config = Configuration(**properties) | |
with Outputs(args.output_dir, args.log_file) as outputs: | |
analyzer = DatasetAnalyzer( | |
outputs, | |
config.database_connection, | |
ignored_users_file=args.ignore_users_file, | |
ignored_groups=args.ignore_group, | |
) | |
analyzer.walk_datasets() | |
analyzer.summarize() | |
if __name__ == '__main__': | |
main(sys.argv) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Finds datasets on disk that shouldn't be there. | |
This script is not object-store aware. You just point it at directories that are DiskObjectStore backends (e.g. | |
file_path in the Galaxy config by default). | |
""" | |
from __future__ import absolute_import, print_function | |
import sys | |
from argparse import ArgumentParser | |
from errno import EEXIST | |
from os import ( | |
getcwd, | |
makedirs, | |
readlink, | |
walk | |
) | |
from os.path import ( | |
exists, | |
getsize, | |
join, | |
islink | |
) | |
try: | |
from galaxy.config import Configuration | |
from galaxy.model import mapping | |
from galaxy.util import nice_size | |
from galaxy.util.script import app_properties_from_args, populate_config_args | |
except ImportError: | |
Configuration = None | |
mapping = None | |
nice_size = None | |
app_properties_from_args = None | |
populate_config_args = None | |
def parse_args(argv): | |
parser = ArgumentParser() | |
parser.add_argument('-g', '--galaxy', help='Path to Galaxy') | |
parser.add_argument('-o', '--output-dir', default=getcwd(), help='Path to output directory') | |
parser.add_argument('-l', '--log-file', default=sys.stderr, | |
help='Path to log file (relative to OUTPUT_DIR if not absolute)') | |
if populate_config_args: | |
print('reparsing arguments with galaxy.util.script', file=sys.stderr) | |
populate_config_args(parser) | |
parser.add_argument('dirs', nargs='+', help='Dir(s) to search') | |
return parser.parse_args(argv) | |
else: | |
print('parsing arguments without galaxy.util.script', file=sys.stderr) | |
parser.add_argument('dirs', nargs='+', help='Dir(s) to search') | |
return parser.parse_known_args(argv)[0] | |
def import_galaxy(args): | |
assert args.galaxy, "'galaxy' not on sys.path, `--galaxy` must be set" | |
print('importing galaxy from: {}'.format(args.galaxy), file=sys.stderr) | |
sys.path.append(join(args.galaxy, 'lib')) | |
global Configuration | |
_t = __import__('galaxy.config', globals(), locals(), ['Configuration'], 0) | |
Configuration = _t.Configuration | |
global mapping | |
_t = __import__('galaxy.model', globals(), locals(), ['mapping'], 0) | |
mapping = _t.mapping | |
global nice_size | |
_t = __import__('galaxy.util', globals(), locals(), ['nice_size'], 0) | |
nice_size = _t.nice_size | |
global app_properties_from_args | |
global populate_config_args | |
_t = __import__('galaxy.util.script', globals(), locals(), ['app_properties_from_args', 'populate_config_args'], 0) | |
app_properties_from_args = _t.app_properties_from_args | |
populate_config_args = _t.populate_config_args | |
return mapping | |
def get_model(config): | |
print('initializing model', file=sys.stderr) | |
return mapping.init(config.file_path, config.database_connection, create_tables=False) | |
class Total(object): | |
def __init__(self, total=0): | |
self.__total = total | |
def __nonzero__(self): | |
return self.__bool__() | |
def __bool__(self): | |
return bool(self.__total) | |
def __iadd__(self, other): | |
self.__total += int(other) | |
return self | |
def __add__(self, other): | |
return self.__total + int(other) | |
def __int__(self): | |
return self.__total | |
def __str__(self): | |
return str(self.__total) | |
@property | |
def human(self): | |
return nice_size(self.__total) | |
class Outputs(object): | |
def __init__(self, output_dir, log_file): | |
self.__output_dir = output_dir | |
self.__log_file = log_file | |
self.__files = {} | |
try: | |
makedirs(self.__output_dir) | |
except (OSError, IOError) as exc: | |
if exc.errno != EEXIST: | |
raise | |
try: | |
self.log('logging to: stderr') | |
except AttributeError: | |
print('logging to: {}'.format(self.__log_file), file=sys.stderr) | |
self.__log_file = open(join(self.__output_dir, self.__log_file), 'w') | |
self.log('opened log: {}'.format(self.__log_file.name)) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
for f in self.__files.values(): | |
f.close() | |
def __getitem__(self, name): | |
if name not in self.__files: | |
self.__files[name] = open(join(self.__output_dir, name + '.txt'), 'w') | |
return self.__files[name] | |
def log(self, msg): | |
print(msg, file=self.__log_file) | |
def log_oserror(self, exc): | |
self.log('WARNING: {}: [Errno {}] {}: {}'.format(exc.__class__.__name__, exc.errno, exc.strerror, exc.filename)) | |
class BadFinder(object): | |
def __init__(self, outputs, cumulative, model, model_object_name='Dataset'): | |
self.outputs = outputs | |
self.cumulative = cumulative | |
self.model = model | |
self.context = model.context.current | |
self.model_object_name = model_object_name | |
if model_object_name == 'Dataset': | |
self.model_object = model.Dataset | |
self.prefix = 'dataset_' | |
elif model_object_name == 'MetadataFile': | |
self.model_object = model.MetadataFile | |
self.prefix = 'metadata_' | |
else: | |
raise Exception('Unknown model object: {}'.format(model_object_name)) | |
def log_summary(self, total, msg, c='-'): | |
if total: | |
sep = c * 2 | |
self.outputs.log('{} {}: {} ({}) {} {}'.format(sep, self.cumulative.human, total.human, total, msg, sep)) | |
def sum_dir(self, root): | |
total = Total() | |
for dirpath, dirnames, filenames in walk(root): | |
for filename, path in iter_contents(dirpath, filenames): | |
try: | |
size = size_sum(getsize(path), total) | |
self.outputs.log( | |
'{}: {} ({}) member {}'.format( | |
nice_size(self.cumulative + total), nice_size(size), size, path | |
) | |
) | |
except (IOError, OSError) as exc: | |
self.outputs.log_oserror(exc) | |
return total | |
def check_link_dirs(self, dirs): | |
links = [] | |
for dirname, path in dirs: | |
if islink(path): | |
self.outputs.log('BAD: dir is link!: {} -> {}'.format(path, readlink(path))) | |
self.outputs['link_dirs'].write('{}\n'.format(path)) | |
links.append(dirname) | |
return links | |
def check_efp_dirs(self, dirs): | |
total = Total() | |
for dirname, path in dirs: | |
# find all efp dirs, sum all should-be-purged | |
dataset_id = get_dataset_id(dirname, self.prefix, '_files') | |
d = self.context.query(self.model_object).get(dataset_id) | |
if d.purged: | |
size = size_sum(self.sum_dir(path), total, self.cumulative) | |
self.outputs['unpurged_efp_dirs'].write('{} {}\n'.format(size, path)) | |
self.log_summary(Total(size), 'contents of unpurged efp dir {}/'.format(path)) | |
return total | |
def check_invalid_dirs(self, dirs): | |
total = Total() | |
for dirname, path in dirs: | |
size = size_sum(self.sum_dir(path), total, self.cumulative) | |
self.outputs['invalid_dirs'].write('{} {}\n'.format(size, path)) | |
self.log_summary(Total(size), 'contents of invalid dir {}/'.format(path)) | |
return total | |
def bad_file(self, path, output='invalid_files'): | |
total = Total() | |
try: | |
size = size_sum(getsize(path), total, self.cumulative) | |
self.outputs.log('{}: {} ({}) {} {}'.format(self.cumulative.human, nice_size(size), size, output, path)) | |
self.outputs[output].write('{} {}\n'.format(size, path)) | |
except (IOError, OSError) as exc: | |
self.outputs.log_oserror(exc) | |
return total | |
def check_files(self, files): | |
total = Total() | |
badfiles = [] | |
for filename, path in files: | |
if islink(path): | |
self.outputs.log('BAD: file is link!: {} -> {}'.format(path, readlink(path))) | |
self.outputs['link_files'].write('{}\n'.format(path)) | |
continue | |
if filename.startswith(self.prefix) and filename.endswith('.dat'): | |
try: | |
dataset_id = get_dataset_id(filename, self.prefix, '.dat') | |
except ValueError: | |
self.outputs.log('WARNING: file has dataset-like filename but is invalid: {}'.format(path)) | |
total += self.bad_file(path) | |
continue | |
d = self.context.query(self.model_object).get(dataset_id) | |
if d.purged: | |
total += self.bad_file(path, 'unpurged_files') | |
else: | |
total += self.bad_file(path) | |
return total | |
def walk_path(self, path): | |
cumulative = Total() | |
self.outputs.log('walking: {}'.format(path)) | |
for dirpath, dirnames, filenames in walk(path): | |
self.outputs.log('walked to: {}'.format(dirpath)) | |
total = Total() | |
rel = dirpath[len(path.rstrip('/')) + 1:] | |
# we should not be walking in any subdirs of hash dirs in the outer walk | |
if not dirpath == path and not is_hash_dir(rel): | |
self.outputs.log('WARNING: invalid dir (how did we get here?): {}'.format(dirpath)) | |
self.outputs.log('WARNING: ignoring subdirs: {}'.format(dirnames)) | |
del dirnames[:] | |
continue | |
# walk _metadata_files (at the root - otherwise it's invalid) later if it exists | |
if dirpath == path: | |
try: | |
dirnames.remove('_metadata_files') | |
except ValueError: | |
pass | |
# remove symlink dirs before dividing | |
links = self.check_link_dirs(iter_contents(dirpath, dirnames)) | |
for link in links: | |
dirnames.remove(link) | |
# classify subdirs | |
hash_subdirs, efp_subdirs, invalid_subdirs = divide_subdirs(dirnames, self.prefix) | |
# remove all non-hash dirs from dirnames | |
del dirnames[:] | |
dirnames.extend(hash_subdirs) | |
# find bad stuff | |
efp_total = Total() | |
dirs_total = Total() | |
files_total = Total() | |
size_sum(self.check_efp_dirs(iter_contents(dirpath, efp_subdirs)), efp_total, total, cumulative) | |
self.log_summary( | |
efp_total, 'contents of all unpurged {} efp dirs in {}/'.format(self.model_object_name, dirpath), c="=" | |
) | |
size_sum(self.check_invalid_dirs(iter_contents(dirpath, invalid_subdirs)), dirs_total, total, cumulative) | |
self.log_summary(dirs_total, 'contents of all invalid directories in {}/'.format(dirpath), c="=") | |
size_sum(self.check_files(iter_contents(dirpath, filenames)), files_total, total, cumulative) | |
self.log_summary(files_total, 'all unpurged/invalid files in {}/'.format(dirpath), c="=") | |
self.log_summary(total, 'all direct (non-recursive) bad contents of {}/'.format(dirpath), c="#") | |
mfpath = join(path, '_metadata_files') | |
if exists(mfpath): | |
finder = BadFinder(self.outputs, self.cumulative, self.model, model_object_name='MetadataFile') | |
finder.walk_path(mfpath) | |
self.log_summary(cumulative, 'total bad contents of {}/'.format(path), c="^") | |
return cumulative | |
def size_sum(s, *totals): | |
"""totals must be mutable (e.g. an instance of Total)""" | |
for total in totals: | |
total += s | |
return s | |
def iter_contents(dirpath, contents): | |
for content in contents: | |
yield (content, join(dirpath, content)) | |
def is_hash_dir(dirpath): | |
try: | |
for elem in dirpath.split('/'): | |
assert int(elem) < 1000 | |
return True | |
except (AssertionError, ValueError): | |
pass | |
return False | |
def get_dataset_id(name, prefix, suffix): | |
return int(name[len(prefix):][:-len(suffix)]) | |
def divide_subdirs(dirnames, prefix): | |
# should only be called if at the root of a valid hash dir | |
hashdirs = [] | |
efps = [] | |
bads = [] | |
for dirname in dirnames: | |
try: | |
# hash subdir | |
assert int(dirname) < 1000 | |
hashdirs.append(dirname) | |
continue | |
except (AssertionError, ValueError): | |
pass | |
try: | |
assert dirname.startswith(prefix) and dirname.endswith('_files') | |
get_dataset_id(dirname, prefix, '_files') | |
efps.append(dirname) | |
continue | |
except (AssertionError, ValueError): | |
pass | |
bads.append(dirname) | |
return (sorted(hashdirs), sorted(efps), sorted(bads)) | |
def walk_all(dirs, model, outputs): | |
total = Total() | |
for path in dirs: | |
outputs.log('finding files in: {}'.format(path)) | |
finder = BadFinder(outputs, total, model) | |
finder.walk_path(path) | |
outputs.log('** {} ({}) total bad contents **'.format(total.human, total)) | |
def main(argv): | |
args = parse_args(argv[1:]) | |
if mapping is None: | |
import_galaxy(args) | |
args = parse_args(argv[1:]) | |
properties = app_properties_from_args(args) | |
config = Configuration(**properties) | |
model = get_model(config) | |
with Outputs(args.output_dir, args.log_file) as outputs: | |
walk_all(args.dirs, model, outputs) | |
if __name__ == '__main__': | |
main(sys.argv) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" Relocate files prior to removing them. | |
Input is from findbad.py (the output files for invalid and unpurged items, not links). | |
""" | |
from __future__ import print_function | |
import sys | |
from errno import EEXIST | |
from os import rename, makedirs | |
from os.path import dirname, join, commonprefix, isdir, exists | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('-o', '--output-dir', default='/galaxy-repl/main/rmfiles', help='Path to output directory') | |
parser.add_argument('--dry-run', action='store_true', default=False, help='Show what would be done without doing it') | |
parser.add_argument('--test', type=int, default=-1, help='Exit after N moves') | |
parser.add_argument('files', nargs='+', help='File(s) to read') | |
args = parser.parse_args() | |
# for dry runs | |
dirs_made = set() | |
def make_dstdir(d): | |
if not args.dry_run: | |
try: | |
makedirs(d) | |
print('mkdir %s' % d) | |
except (IOError, OSError) as exc: | |
if exc.errno != EEXIST: | |
raise | |
elif d not in dirs_made and not isdir(d): | |
print('mkdir %s' % d) | |
dirs_made.add(d) | |
def move(src, dst): | |
if not args.dry_run: | |
try: | |
rename(src, dst) | |
print('%s -> %s' % (src, dst)) | |
return True | |
except (IOError, OSError) as exc: | |
print('%s: [Errno %d] %s: %s' % (exc.__class__.__name__, exc.errno, exc.strerror, src)) | |
elif exists(src): | |
print('%s -> %s' % (src, dst)) | |
return True | |
else: | |
print('ENOENT %s' % src) | |
return False | |
def process_file(fname): | |
moves_made = 0 | |
for line in open(fname): | |
if args.test >= 0 and moves_made >= args.test: | |
print('test mode: stopping after %d moves' % moves_made) | |
break | |
parts = line.strip().split() | |
src = parts[1] | |
rel = src[len(commonprefix([args.output_dir, src])):] | |
dst = join(args.output_dir, rel) | |
make_dstdir(dirname(dst)) | |
if move(src, dst): | |
moves_made += 1 | |
print('completed %d moves' % moves_made) | |
if __name__ == '__main__': | |
for f in args.files: | |
process_file(f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment