Created
November 27, 2013 09:48
-
-
Save mpkocher/7673190 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import functools | |
import logging | |
import warnings | |
import operator | |
log = logging.getLogger(__name__) | |
class RuleBase(object): | |
__metaclass__ = abc.ABCMeta | |
@abc.abstractmethod | |
def apply(self, values): | |
""" | |
The values should be given as a list of [(changelist, value),...] | |
where the list is ordered by changelist. | |
""" | |
return "" | |
def __str__(self): | |
s = " ".join([":".join([n, str(a)]) for n, a in self.__dict__.iteritems()]) | |
return "{k} {n}".format(k=self.__class__.__name__, n=s) | |
def __repr__(self): | |
return " ".join(['<', str(self), '>']) | |
class ExactRule(RuleBase): | |
def __init__(self): | |
"""There can be absolutely no-change in values. | |
Only really useful for integer metrics. | |
""" | |
pass | |
def apply(self, values): | |
items = [operator.getitem(x, 1) for x in values] | |
dx = items[-1] - items[-2] | |
return dx == 0 | |
class RoundRule(RuleBase): | |
def __init__(self, places): | |
""" | |
Round to specific number of places +/- delta of the difference | |
between the values. Useful for float values. | |
""" | |
self.places = places | |
def apply(self, values): | |
items = [operator.getitem(x, 1) for x in values] | |
dx = items[-1] - items[-2] | |
rdx = round(abs(dx), self.places) | |
#log.info((dx, rdx, self.places)) | |
return rdx == 0.0 | |
class AbsoluteDiffRule(RuleBase): | |
def __init__(self, delta): | |
"""Allowed +/- delta of the values""" | |
self.delta = delta | |
def apply(self, values): | |
items = [operator.getitem(x, 1) for x in values] | |
dx = items[-1] - items[-2] | |
#log.info((dx, self.delta)) | |
return abs(dx) <= self.delta | |
class DiffPercentRule(RuleBase): | |
def __init__(self, percent): | |
"""The values must differ by less than a percentage""" | |
self.percent = percent | |
def apply(self, values): | |
items = [operator.getitem(x, 1) for x in values] | |
dx = items[-1] - items[-2] | |
if dx == 0: | |
return True | |
p = (float(abs(dx)) / items[-1]) * 100 | |
#log.info((p, self.percent)) | |
return p <= self.percent | |
class TrendModelBase(object): | |
@classmethod | |
def get_metric_rules(cls): | |
""" | |
Return a dict of {metric_name:Rule} | |
Use the class as a husk to store the metrics that are MixedIn | |
""" | |
return {x: getattr(cls, x) for x in dir(cls) if isinstance(getattr(cls, x), RuleBase)} | |
class _MixinMetaKlass(type): | |
def __new__(cls, name, parents, dct): | |
for k, v in dct.iteritems(): | |
if not k.startswith('_'): | |
# Only allow class vars to be instances of Rules | |
if not isinstance(v, RuleBase): | |
c_ = dict(c=cls.__name__, t=type(v), n=k) | |
raise TypeError("Rules defined in {c} must be of type RuleBase, got {n} with type {t}.".format(**c_)) | |
return super(_MixinMetaKlass, cls).__new__(cls, name, parents, dct) | |
class ModelMixinBase(object): | |
__metaclass__ = _MixinMetaKlass | |
class FilterModelMixin(ModelMixinBase): | |
"""Filter grouping of metrics""" | |
nmovies = ExactRule() | |
ncells = ExactRule() | |
reads_n_pre_filter = ExactRule() | |
hq_short_inserts = ExactRule() | |
hq_medium_inserts = ExactRule() | |
class MappingModelMixin(ModelMixinBase): | |
"""Mapping Grouping of metrics""" | |
mapped_bases_n = AbsoluteDiffRule(10) | |
mapped_subread_bases_n = ExactRule() | |
missing_bases_pct = DiffPercentRule(5.0) | |
pct_first_subreads_ts_lt_100 = DiffPercentRule(5.0) | |
pct_first_subreads_ts_lt_50 = RoundRule(2) | |
class ResquencingModel(TrendModelBase, FilterModelMixin, MappingModelMixin): | |
""" | |
Groupings of metric rules (a Model) can be grouped using Mixins | |
and inheriting from the TrendModeBase | |
""" | |
pass | |
class CustomResequncingModel(ResquencingModel): | |
"""specific metric rules can be overridden here for a per job""" | |
mapped_subread_bases_n = AbsoluteDiffRule(10) | |
def model_runner(trend_model, metric_d): | |
""" | |
:param trend_model: Model Class | |
:param metric_d: dict {metric_name:[(changelist, value), (changelist, value)]} | |
""" | |
# some type checking | |
if not isinstance(trend_model, trend_model): | |
if not issubclass(trend_model, TrendModelBase): | |
if not issubclass(trend_model, ModelMixinBase): | |
d_ = dict(t=TrendModelBase, m=ModelMixinBase, x=type(trend_model)) | |
raise TypeError("expected type {t} with mixin {m}. Got type {x}".format(**d_)) | |
if not isinstance(metric_d, dict): | |
raise TypeError("Expected dict type, got type {t}".format(t=type(metric_d))) | |
# store the failed metrics | |
failed_metrics = [] | |
metric_rules = trend_model.get_metric_rules() | |
for metric_name, metric_values in metric_d.iteritems(): | |
rule = metric_rules[metric_name] | |
if rule is None: | |
warnings.warn("Unable to find metric {n}. Rules will not be applied.".format(n=metric_name)) | |
else: | |
# apply rules | |
state = rule.apply(metric_values) | |
log.info("Rule {r} applied to {m}. Success? {s}.".format(r=rule, m=metric_name, s=state)) | |
if not state: | |
failed_metrics.append(metric_name) | |
log.debug("Number of failed metrics {m}/{t}.".format(m=len(failed_metrics), t=len(metric_d))) | |
return failed_metrics | |
def get_example_metrics(): | |
m = {} | |
# the metrics are ordered ascending | |
m['mapped_bases_n'] = [(1234, 1), (1235, 1)] | |
m['nmovies'] = [(1234, 1), (1235, 1)] | |
m['reads_n_pre_filter'] = [(1234, 1), (1234, 1)] | |
# 5% allowed difference | |
m['pct_first_subreads_ts_lt_100'] = [(1234, 97), (1235, 100)] | |
m['pct_first_subreads_ts_lt_50'] = [(1234, 0.01), (1235, 0.009)] | |
return m | |
def get_failed_example_metrics(): | |
"""These should all fail by design""" | |
m = {} | |
m['mapped_bases_n'] = [(1234, 9), (1235, 20)] | |
m['nmovies'] = [(1234, 1), (1235, 2)] | |
m['reads_n_pre_filter'] = [(1234, 1), (1234, 2)] | |
# 5% allowed difference | |
m['pct_first_subreads_ts_lt_100'] = [(1234, 94), (1235, 100)] | |
# Round rule | |
m['pct_first_subreads_ts_lt_50'] = [(1234, 0.01), (1235, 0.02)] | |
return m | |
#from pysiv.utils import setup_log | |
#setup_log(log, level=logging.DEBUG) | |
run_resquencing_model = functools.partial(model_runner, ResquencingModel) | |
def full_example(): | |
"""Sketch of full workflow: | |
siv_id = "Stuff" | |
metrics_d = get_metrics_from_db(siv_id) | |
# look up rules model from the siv_id | |
rule_model = get_rules_model(siv_id) | |
failed_metrics = model_runner(rule_model, metrics_d) | |
Then, use the failed metrics for a given siv_id to create a new collection | |
in the database which has siv_id, changelist, failed_metrics. | |
""" | |
pass | |
def run_example(metrics_d): | |
failed_metrics = run_resquencing_model(metrics_d) | |
return failed_metrics | |
def example(): | |
m_d = get_example_metrics() | |
failed_metrics = run_resquencing_model(m_d) | |
return failed_metrics | |
def example_failed(): | |
m_d = get_failed_example_metrics() | |
failed_metrics = run_resquencing_model(m_d) | |
return failed_metrics | |
def examples(): | |
example() | |
example_failed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment