Skip to content

Instantly share code, notes, and snippets.

@iAnanich
Created February 1, 2018 14:30
Show Gist options
  • Save iAnanich/53a99ebc56460882d7872bb225cb093d to your computer and use it in GitHub Desktop.
Save iAnanich/53a99ebc56460882d7872bb225cb093d to your computer and use it in GitHub Desktop.
IterManager - helps with complex iteration cases - counters, excludes, context, type checks etc.
import typing
TypeOrNone = typing.Union[type, None]
def has_wrong_type(obj, expected_obj_type: TypeOrNone) -> bool:
"""
Checks if given `obj` object has not given `expected_obj_type` type. If
`expected_obj_type` is `None` than it will return `True`
:param obj: any object
:param expected_obj_type: expected type of the object or `None`
:return: `True` if `obj` object is not of `expected_obj_type` type, `False`
if `expected_obj_type` is `None` or `obj` object has `expected_obj_type` type
"""
# if `expected` type is `None` it will
# return False without `isinstance` call
return expected_obj_type is not None and not isinstance(obj, expected_obj_type)
def raise_type_error(obj_repr: str, obj_type: type, expected_obj_type: type,
obj_name: str ='This'):
raise TypeError(
f'{obj_name} {obj_repr} has "{obj_type}" type while '
f'"{expected_obj_type}" is expected.'
)
def check_obj_type(obj, expected_obj_type: TypeOrNone, obj_name: str ='object'):
if has_wrong_type(obj=obj, expected_obj_type=expected_obj_type):
raise_type_error(
obj_name=obj_name,
obj_repr=repr(obj),
obj_type=type(obj),
expected_obj_type=expected_obj_type,
)
import typing
ThresholdBasis = typing.Union[int, None]
class Threshold:
def __init__(self, val: ThresholdBasis=None):
if val is None or val is 0:
self._value = 0 # as equivalent to None
elif isinstance(val, int):
if val <= 0:
raise TypeError(
f'first argument must not be less then zero, got "{val}".')
else:
self._value = val
else:
raise TypeError(
f'first argument must be of type `int` or `NoneType`, '
f'got "{val}" of "{type(val)}" type.')
# public API
@property
def value(self) -> ThresholdBasis:
if not self: # if self._value == 0
return None
return self._value
def __int__(self):
if not self:
raise TypeError(
f'this threshold equals to None and '
f'can not be used as an integer.')
return self._value
def __bool__(self):
return self._value != 0
# string representations
def __repr__(self):
return f'<Threshold {self.value}>'
def __str__(self):
return str(self.value)
class Counter:
def __init__(self):
self._is_enabled = True
self._count = 0
def add(self) -> bool:
if self._is_enabled:
self._count += 1
return bool(self)
else:
return False
def drop(self):
if self._is_enabled:
self._count = 0
@property
def count(self) -> int:
return self._count
@property
def is_enabled(self) -> bool:
return self._is_enabled
def enable(self):
self._is_enabled = True
def disable(self):
self._is_enabled = False
# string representations
def __repr__(self):
return f'<Counter count={self._count}>'
def __str__(self):
return str(self._count)
class CounterWithThreshold(Counter):
def __init__(self, threshold: Threshold = None):
super().__init__()
if not threshold:
self._is_enabled = False
self._count = -1
self._threshold = threshold
@property
def threshold(self) -> Threshold:
return self._threshold
def __bool__(self):
if self._is_enabled:
return self._count >= int(self._threshold)
return True
def __repr__(self):
return f'<Counter {"REACHED" if bool(self) else "count=%s" % self._count}>'
import abc
import typing
from .check import has_wrong_type, raise_type_error, check_obj_type
class BaseFunc(abc.ABC):
def __init__(self, func, args: tuple = None, kwargs: dict = None):
if kwargs is None:
kwargs = dict()
if args is None:
args = tuple()
if not isinstance(args, tuple):
raise TypeError('Given `args` are not `tuple` object.')
if not isinstance(kwargs, dict):
raise TypeError('Given `kwargs` are not `dict` object.')
if not callable(func):
raise TypeError('Given `func` argument must be callable.')
self.function = func
self.args = args
self.kwargs = kwargs
@abc.abstractmethod
def call(self, input_value):
pass
class Func(BaseFunc):
def call(self, input_value):
return self.function(input_value, *self.args, **self.kwargs)
class StronglyTypedFunc(BaseFunc):
# None value will cause type check to pass any type
output_type = None
input_type = None
def __init__(self, func, args: tuple =None, kwargs: dict =None,
input_type: type =None, output_type: type =None):
super().__init__(
func=func,
args=args,
kwargs=kwargs,
)
# override class attributes
if input_type is not None:
self.input_type = input_type
if output_type is not None:
self.output_type = output_type
def call(self, input_value):
self._check_input(input_value)
output_value = self.function(input_value, *self.args, **self.kwargs)
self._check_output(output_value)
return output_value
def _check_input(self, value):
self._check_type(value, self.input_type, 'input')
def _check_output(self, value):
self._check_type(value, self.output_type, 'output')
def _check_type(self, value, expected: type or None, action: str):
if has_wrong_type(value, expected):
raise_type_error(
obj_repr=repr(value),
obj_type=type(value),
obj_name=f'{action.capitalize()} value',
expected_obj_type=expected,
)
class FuncSequence:
func_type = BaseFunc
def __init__(self, *funcs: func_type):
for i, func in enumerate(funcs):
check_obj_type(func, self.func_type, f'Callable #{i}')
self._list: typing.List[self.func_type] = list(funcs)
def process(self, value):
for middleware in self._list:
value = middleware.call(value)
else:
return value
# some list methods
def copy(self):
return self.__class__(*self._list)
def clear(self):
try:
while True:
self._list.pop()
except IndexError:
pass
def reverse(self):
sequence = self._list
n = len(sequence)
for i in range(n//2):
sequence[i], sequence[n - i - 1] = sequence[n - i - 1], sequence[i]
def pop(self, index: int =-1):
v = self._list[index]
del self._list[index]
return v
def append(self, func: func_type):
check_obj_type(func, self.func_type, f'Callable')
self._list.append(func)
def remove(self, value: func_type):
del self._list[self._list.index(value)]
def extend(self, funcs: typing.Sequence[func_type]):
for i, func in enumerate(funcs):
check_obj_type(func, self.func_type, f'Callable #{i}')
self._list.append(func)
import collections
import types
from typing import Iterator, Callable, Sequence
from .counter import Threshold, CounterWithThreshold
from .func import StronglyTypedFunc
from .check import check_obj_type
class ExcludeCheck:
def __init__(self, iterator: Iterator, default=None):
check_obj_type(iterator, collections.Iterator, 'Iterator')
self._iterator = iterator
self._default = default
self._is_completed = False
self._yield_next()
def _yield_next(self):
try:
value = next(self._iterator)
except StopIteration:
value = self._default
self._is_completed = True
self._value = value
return value
def check_next(self, value):
if self._is_completed:
return False
if value == self._value:
self._yield_next()
return True
return False
@property
def value(self):
return self._value
class BaseContext:
CLOSE_REASON = 'close_reason'
VALUE = 'value'
EXCLUDE_VALUE = 'exclude_value'
_lock_keys = frozenset((CLOSE_REASON, VALUE, EXCLUDE_VALUE))
_value_type: type = object
_exclude_value_type: type = object
def __init__(self, value, exclude_value):
check_obj_type(value, self._value_type, 'Value')
check_obj_type(exclude_value, self._exclude_value_type, 'Exclude value')
self._dict = {
self.VALUE: value,
self.EXCLUDE_VALUE: exclude_value,
}
def set_close_reason(self, message: str):
check_obj_type(message, str, 'Message')
if self.close_reason:
self._dict[self.CLOSE_REASON].append(message)
else:
self._dict[self.CLOSE_REASON] = [message, ]
@property
def value(self):
return self._dict[self.VALUE]
@property
def exclude_value(self):
return self._dict[self.EXCLUDE_VALUE]
@property
def close_reason(self) -> str:
"""
Returns last set close reason message.
:return: string
"""
close_reasons = self._dict.get(self.CLOSE_REASON, None)
if close_reasons:
return close_reasons[-1]
def dict_proxy(self):
return types.MappingProxyType(self._dict)
def update(self, dictionary: dict):
for key, val in dictionary.items():
self[key] = val
def __getitem__(self, item: str):
return self._dict[item]
def __setitem__(self, key: str, value):
if key not in self._lock_keys:
self._dict[key] = value
else:
raise KeyError(f'{key} key can not be assigned in this way.')
@classmethod
def new(cls, value_type: type, exclude_value_type: type,
name: str='Context') -> type:
attributes = {
'_value_type': value_type,
'_exclude_value_type': exclude_value_type
}
return type(name, (cls, ), attributes)
class IterManager:
_base_context_type = BaseContext
_context_processor_output_type = bool
def __init__(self, general_iterator: Iterator,
value_type: type =None, return_type: type =None,
exclude_value_type: type =None,
exclude_iterator: Iterator =None, exclude_default=None,
max_iterations: int or None =None,
max_exclude_strike: int or None =None,
max_total_excluded: int or None =None,
max_returned_values: int or None =None,
case_processors: Sequence[Callable] =None,
context_processor: Callable =None,
return_value_processor: Callable =None,
before_finish: Callable =None):
# `*_type` attributes can be even None because will be only used
# by `func.StronglyTypedFunc` that uses `check.check_obj_type`
self._value_type = value_type
self._return_type = return_type
self._exclude_type = exclude_value_type
check_obj_type(general_iterator, collections.Iterator, 'General iterator')
self._general_iterator = general_iterator
check_obj_type(exclude_default, exclude_value_type, 'Exclude default value')
self._exclude_default = exclude_default
if exclude_iterator is None:
exclude_iterator = iter([]) # empty iterator
self._exclude_checker = ExcludeCheck(
iterator=exclude_iterator,
default=self._exclude_default)
self._exclude_iterator = exclude_iterator
self._total_iterations_threshold = Threshold(max_iterations)
self._total_iterations_counter = CounterWithThreshold(
threshold=self._total_iterations_threshold)
self._exclude_strike_threshold = Threshold(max_exclude_strike)
self._exclude_strike_counter = CounterWithThreshold(
threshold=self._exclude_strike_threshold)
self._total_excluded_threshold = Threshold(max_total_excluded)
self._total_excluded_counter = CounterWithThreshold(
threshold=self._total_excluded_threshold)
self._total_returned_threshold = Threshold(max_returned_values)
self._total_returned_counter = CounterWithThreshold(
threshold=self._total_returned_threshold)
self._context_type = self._base_context_type.new(value_type, exclude_value_type)
if context_processor is None:
context_processor = lambda value: BaseContext(value=value, exclude_value=value)
self._context_processor = StronglyTypedFunc(
func=context_processor,
kwargs={'context_type': self._context_type},
input_type=self._value_type,
output_type=self._context_type, )
if before_finish is None:
before_finish = lambda ctx: None
self._before_finish = StronglyTypedFunc(
func=before_finish,
input_type=self._context_type,
output_type=None, )
if return_value_processor is None:
return_value_processor = lambda ctx: ctx.value
self._return_value_processor = StronglyTypedFunc(
func=return_value_processor,
input_type=self._context_type,
output_type=self._return_type,)
if case_processors is None:
case_processors = []
self._case_processors = [
StronglyTypedFunc(
func=processor,
input_type=self._context_type,
output_type=self._context_processor_output_type, )
for processor in case_processors]
def _chain_case_processors(self, context: BaseContext) -> bool:
"""
:param context: BaseContext object
:return: True if any case processor have returned True, else False
"""
for processor in self._case_processors:
if processor.call(context):
return True
else:
return False
def _check_exclude(self, context: BaseContext) -> bool:
"""
:param context: BaseContext object
:return: True if value must be returned, else False
"""
if self._exclude_checker.check_next(context.exclude_value):
if self._exclude_strike_counter.add():
context.set_close_reason('Exclude matches threshold reached.')
if self._total_excluded_counter.add():
context.set_close_reason('Total excluded threshold reached.')
return False
else:
self._exclude_strike_counter.drop()
return True
def _return(self, context: BaseContext) -> object:
"""
Increases `total_returned_counter`, and calls `return_value_processor`
:param context: BaseContext object
:return: returns processed value
"""
if self._total_returned_counter.add():
context.set_close_reason('Returned values threshold reached.')
return self._return_value_processor.call(context)
def __iter__(self):
for value in self._general_iterator:
context: BaseContext = self._context_processor.call(value)
if self._chain_case_processors(context):
continue
if self._check_exclude(context):
yield self._return(context)
if self._total_iterations_counter.add():
context.set_close_reason('Iterations count threshold reached.')
if context.close_reason:
self._before_finish.call(context)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment