Skip to content

Instantly share code, notes, and snippets.

@david-caro
Last active September 3, 2018 12:36
Show Gist options
  • Save david-caro/9984dec8d5cf96734edcd62b9368dbc8 to your computer and use it in GitHub Desktop.
Save david-caro/9984dec8d5cf96734edcd62b9368dbc8 to your computer and use it in GitHub Desktop.
from invenio_workflows import workflow_object_class, ObjectStatus
from collections import Counter
def _get_nice_message(wflw):
if 'Found workflows in ERROR or INITIAL state:' in wflw.extra_data['_error_msg']:
return wflw.extra_data['_error_msg'].splitlines()[-1].rsplit(':', 1)[0]
return wflw.extra_data['_error_msg'].splitlines()[-1]
def get_errors_by_type():
errors = workflow_object_class.query(status=ObjectStatus.ERROR)
by_type = {}
for error in errors:
by_type[_get_nice_message(error)] = by_type.get(_get_nice_message(error), []) + [error]
return by_type
def get_errors_count(errors_dict):
return sorted([(len(errs), msg) for (msg, errs) in errors_dict.items()])
def show_error_counts(errors_dict):
for count, msg in get_errors_count(errors_dict):
print count, msg
def restart_errored(errors_list, step='restart_task'):
results = []
for error in errors_list:
results.append(error.continue_workflow(step, delayed=True))
return results
def get_result_counts(results):
return Counter(res.status for res in results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment