Skip to content

Instantly share code, notes, and snippets.

@rominf

rominf/step_builder.py Secret

Created Jul 13, 2020
Embed
What would you like to do?
repr take too long
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# __coconut_hash__ = 0x88978a7c
# Compiled with Coconut version 1.4.3 [Ernest Scribbler]
# Coconut Header: -------------------------------------------------------------
from __future__ import generator_stop, annotations
import sys as _coconut_sys
from builtins import chr, filter, hex, input, int, map, object, oct, open, print, range, str, zip, filter, reversed, enumerate
py_chr, py_hex, py_input, py_int, py_map, py_object, py_oct, py_open, py_print, py_range, py_str, py_zip, py_filter, py_reversed, py_enumerate, py_repr = chr, hex, input, int, map, object, oct, open, print, range, str, zip, filter, reversed, enumerate, repr
_coconut_str = str
class _coconut:
import collections, copy, functools, types, itertools, operator, threading, weakref, os, warnings
import pickle
OrderedDict = collections.OrderedDict
if _coconut_sys.version_info < (3, 3):
abc = collections
else:
import collections.abc as abc
import typing
Ellipsis, Exception, AttributeError, ImportError, IndexError, KeyError, NameError, TypeError, ValueError, StopIteration, classmethod, dict, enumerate, filter, float, frozenset, getattr, hasattr, hash, id, int, isinstance, issubclass, iter, len, list, locals, map, min, max, next, object, property, range, reversed, set, slice, str, sum, super, tuple, type, zip, repr = Ellipsis, Exception, AttributeError, ImportError, IndexError, KeyError, NameError, TypeError, ValueError, StopIteration, classmethod, dict, enumerate, filter, float, frozenset, getattr, hasattr, hash, id, int, isinstance, issubclass, iter, len, list, locals, map, min, max, next, object, property, range, reversed, set, slice, str, sum, super, tuple, type, zip, repr
_coconut_sentinel = _coconut.object()
class MatchError(Exception):
"""Pattern-matching error. Has attributes .pattern and .value."""
__slots__ = ("pattern", "value")
class _coconut_tail_call:
__slots__ = ("func", "args", "kwargs")
def __init__(self, func, *args, **kwargs):
self.func, self.args, self.kwargs = func, args, kwargs
_coconut_tco_func_dict = {}
def _coconut_tco(func):
@_coconut.functools.wraps(func)
def tail_call_optimized_func(*args, **kwargs):
call_func = func
while True:
wkref = _coconut_tco_func_dict.get(_coconut.id(call_func))
if (wkref is not None and wkref() is call_func) or _coconut.isinstance(call_func, _coconut_base_pattern_func):
call_func = call_func._coconut_tco_func
result = call_func(*args, **kwargs) # pass --no-tco to clean up your traceback
if not isinstance(result, _coconut_tail_call):
return result
call_func, args, kwargs = result.func, result.args, result.kwargs
tail_call_optimized_func._coconut_tco_func = func
tail_call_optimized_func.__module__ = _coconut.getattr(func, "__module__", None)
tail_call_optimized_func.__name__ = _coconut.getattr(func, "__name__", "<coconut tco function (pass --no-tco to remove)>")
tail_call_optimized_func.__qualname__ = _coconut.getattr(func, "__qualname__", tail_call_optimized_func.__name__)
_coconut_tco_func_dict[_coconut.id(tail_call_optimized_func)] = _coconut.weakref.ref(tail_call_optimized_func)
return tail_call_optimized_func
def _coconut_igetitem(iterable, index):
if isinstance(iterable, (_coconut_reversed, _coconut_map, _coconut.zip, _coconut_enumerate, _coconut_count, _coconut.abc.Sequence)):
return iterable[index]
if not _coconut.isinstance(index, _coconut.slice):
if index < 0:
return _coconut.collections.deque(iterable, maxlen=-index)[0]
return _coconut.next(_coconut.itertools.islice(iterable, index, index + 1))
if index.start is not None and index.start < 0 and (index.stop is None or index.stop < 0) and index.step is None:
queue = _coconut.collections.deque(iterable, maxlen=-index.start)
if index.stop is not None:
queue = _coconut.list(queue)[:index.stop - index.start]
return queue
if (index.start is not None and index.start < 0) or (index.stop is not None and index.stop < 0) or (index.step is not None and index.step < 0):
return _coconut.list(iterable)[index]
return _coconut.itertools.islice(iterable, index.start, index.stop, index.step)
class _coconut_base_compose:
__slots__ = ("func", "funcstars")
def __init__(self, func, *funcstars):
self.func = func
self.funcstars = []
for f, stars in funcstars:
if _coconut.isinstance(f, _coconut_base_compose):
self.funcstars.append((f.func, stars))
self.funcstars += f.funcstars
else:
self.funcstars.append((f, stars))
def __call__(self, *args, **kwargs):
arg = self.func(*args, **kwargs)
for f, stars in self.funcstars:
if stars == 0:
arg = f(arg)
elif stars == 1:
arg = f(*arg)
elif stars == 2:
arg = f(**arg)
else:
raise _coconut.ValueError("invalid arguments to " + _coconut.repr(self))
return arg
def __repr__(self):
return _coconut.repr(self.func) + " " + " ".join(("..*> " if star == 1 else "..**>" if star == 2 else "..> ") + _coconut.repr(f) for f, star in self.funcstars)
def __reduce__(self):
return (self.__class__, (self.func,) + _coconut.tuple(self.funcstars))
def __get__(self, obj, objtype=None):
return _coconut.functools.partial(self, obj)
def _coconut_forward_compose(func, *funcs): return _coconut_base_compose(func, *((f, 0) for f in funcs))
def _coconut_back_compose(*funcs): return _coconut_forward_compose(*_coconut.reversed(funcs))
def _coconut_forward_star_compose(func, *funcs): return _coconut_base_compose(func, *((f, 1) for f in funcs))
def _coconut_back_star_compose(*funcs): return _coconut_forward_star_compose(*_coconut.reversed(funcs))
def _coconut_forward_dubstar_compose(func, *funcs): return _coconut_base_compose(func, *((f, 2) for f in funcs))
def _coconut_back_dubstar_compose(*funcs): return _coconut_forward_dubstar_compose(*_coconut.reversed(funcs))
def _coconut_pipe(x, f): return f(x)
def _coconut_star_pipe(xs, f): return f(*xs)
def _coconut_dubstar_pipe(kws, f): return f(**kws)
def _coconut_back_pipe(f, x): return f(x)
def _coconut_back_star_pipe(f, xs): return f(*xs)
def _coconut_back_dubstar_pipe(f, kws): return f(**kws)
def _coconut_assert(cond, msg=None): assert cond, msg if msg is not None else "(assert) got falsey value " + _coconut.repr(cond)
def _coconut_bool_and(a, b): return a and b
def _coconut_bool_or(a, b): return a or b
def _coconut_none_coalesce(a, b): return a if a is not None else b
def _coconut_minus(a, *rest):
if not rest:
return -a
for b in rest:
a = a - b
return a
@_coconut.functools.wraps(_coconut.itertools.tee)
def tee(iterable, n=2):
if n >= 0 and _coconut.isinstance(iterable, (_coconut.tuple, _coconut.frozenset)):
return (iterable,) * n
if n > 0 and (_coconut.hasattr(iterable, "__copy__") or _coconut.isinstance(iterable, _coconut.abc.Sequence)):
return (iterable,) + _coconut.tuple(_coconut.copy.copy(iterable) for _ in _coconut.range(n - 1))
return _coconut.itertools.tee(iterable, n)
class reiterable:
"""Allows an iterator to be iterated over multiple times."""
__slots__ = ("iter",)
def __init__(self, iterable):
self.iter = iterable
def _get_new_iter(self):
self.iter, new_iter = _coconut_tee(self.iter)
return new_iter
def __iter__(self):
return _coconut.iter(self._get_new_iter())
def __getitem__(self, index):
return _coconut_igetitem(self._get_new_iter(), index)
def __reversed__(self):
return _coconut_reversed(self._get_new_iter())
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "reiterable(%r)" % (self.iter,)
def __reduce__(self):
return (self.__class__, (self.iter,))
def __copy__(self):
return self.__class__(self._get_new_iter())
def __fmap__(self, func):
return _coconut_map(func, self)
class scan:
"""Reduce func over iterable, yielding intermediate results,
optionally starting from initializer."""
__slots__ = ("func", "iter", "initializer")
def __init__(self, function, iterable, initializer=_coconut_sentinel):
self.func = function
self.iter = iterable
self.initializer = initializer
def __iter__(self):
acc = self.initializer
if acc is not _coconut_sentinel:
yield acc
for item in self.iter:
if acc is _coconut_sentinel:
acc = item
else:
acc = self.func(acc, item)
yield acc
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "scan(%r, %r)" % (self.func, self.iter)
def __reduce__(self):
return (self.__class__, (self.func, self.iter))
def __copy__(self):
return self.__class__(self.func, _coconut.copy.copy(self.iter))
def __fmap__(self, func):
return _coconut_map(func, self)
class reversed:
__slots__ = ("iter",)
if hasattr(_coconut.map, "__doc__"):
__doc__ = _coconut.reversed.__doc__
def __new__(cls, iterable):
if _coconut.isinstance(iterable, _coconut.range):
return iterable[::-1]
if not _coconut.hasattr(iterable, "__reversed__") or _coconut.isinstance(iterable, (_coconut.list, _coconut.tuple)):
return _coconut.object.__new__(cls)
return _coconut.reversed(iterable)
def __init__(self, iterable):
self.iter = iterable
def __iter__(self):
return _coconut.iter(_coconut.reversed(self.iter))
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice):
return _coconut_igetitem(self.iter, _coconut.slice(-(index.start + 1) if index.start is not None else None, -(index.stop + 1) if index.stop else None, -(index.step if index.step is not None else 1)))
return _coconut_igetitem(self.iter, -(index + 1))
def __reversed__(self):
return self.iter
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "reversed(%r)" % (self.iter,)
def __hash__(self):
return -_coconut.hash(self.iter)
def __reduce__(self):
return (self.__class__, (self.iter,))
def __copy__(self):
return self.__class__(_coconut.copy.copy(self.iter))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.iter == other.iter
def __contains__(self, elem):
return elem in self.iter
def count(self, elem):
"""Count the number of times elem appears in the reversed iterator."""
return self.iter.count(elem)
def index(self, elem):
"""Find the index of elem in the reversed iterator."""
return _coconut.len(self.iter) - self.iter.index(elem) - 1
def __fmap__(self, func):
return self.__class__(_coconut_map(func, self.iter))
class map(_coconut.map):
__slots__ = ("func", "iters")
if hasattr(_coconut.map, "__doc__"):
__doc__ = _coconut.map.__doc__
def __new__(cls, function, *iterables):
new_map = _coconut.map.__new__(cls, function, *iterables)
new_map.func = function
new_map.iters = iterables
return new_map
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice):
return self.__class__(self.func, *(_coconut_igetitem(i, index) for i in self.iters))
return self.func(*(_coconut_igetitem(i, index) for i in self.iters))
def __reversed__(self):
return self.__class__(self.func, *(_coconut_reversed(i) for i in self.iters))
def __len__(self):
return _coconut.min(_coconut.len(i) for i in self.iters)
def __repr__(self):
return "map(%r, %s)" % (self.func, ", ".join((_coconut.repr(i) for i in self.iters)))
def __reduce__(self):
return (self.__class__, (self.func,) + self.iters)
def __reduce_ex__(self, _):
return self.__reduce__()
def __copy__(self):
return self.__class__(self.func, *_coconut.map(_coconut.copy.copy, self.iters))
def __fmap__(self, func):
return self.__class__(_coconut_forward_compose(self.func, func), *self.iters)
class parallel_map(map):
"""Multi-process implementation of map using concurrent.futures.
Requires arguments to be pickleable."""
__slots__ = ()
def __iter__(self):
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
return _coconut.iter(_coconut.list(executor.map(self.func, *self.iters)))
def __repr__(self):
return "parallel_" + _coconut_map.__repr__(self)
class concurrent_map(map):
"""Multi-thread implementation of map using concurrent.futures."""
__slots__ = ()
def __iter__(self):
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
return _coconut.iter(_coconut.list(executor.map(self.func, *self.iters)))
def __repr__(self):
return "concurrent_" + _coconut_map.__repr__(self)
class filter(_coconut.filter):
__slots__ = ("func", "iter")
if hasattr(_coconut.filter, "__doc__"):
__doc__ = _coconut.filter.__doc__
def __new__(cls, function, iterable):
new_filter = _coconut.filter.__new__(cls, function, iterable)
new_filter.func = function
new_filter.iter = iterable
return new_filter
def __reversed__(self):
return self.__class__(self.func, _coconut_reversed(self.iter))
def __repr__(self):
return "filter(%r, %r)" % (self.func, self.iter)
def __reduce__(self):
return (self.__class__, (self.func, self.iter))
def __reduce_ex__(self, _):
return self.__reduce__()
def __copy__(self):
return self.__class__(self.func, _coconut.copy.copy(self.iter))
def __fmap__(self, func):
return _coconut_map(func, self)
class zip(_coconut.zip):
__slots__ = ("iters",)
if hasattr(_coconut.zip, "__doc__"):
__doc__ = _coconut.zip.__doc__
def __new__(cls, *iterables):
new_zip = _coconut.zip.__new__(cls, *iterables)
new_zip.iters = iterables
return new_zip
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice):
return self.__class__(*(_coconut_igetitem(i, index) for i in self.iters))
return _coconut.tuple(_coconut_igetitem(i, index) for i in self.iters)
def __reversed__(self):
return self.__class__(*(_coconut_reversed(i) for i in self.iters))
def __len__(self):
return _coconut.min(_coconut.len(i) for i in self.iters)
def __repr__(self):
return "zip(%s)" % (", ".join((_coconut.repr(i) for i in self.iters)),)
def __reduce__(self):
return (self.__class__, self.iters)
def __reduce_ex__(self, _):
return self.__reduce__()
def __copy__(self):
return self.__class__(*_coconut.map(_coconut.copy.copy, self.iters))
def __fmap__(self, func):
return _coconut_map(func, self)
class enumerate(_coconut.enumerate):
__slots__ = ("iter", "start")
if hasattr(_coconut.enumerate, "__doc__"):
__doc__ = _coconut.enumerate.__doc__
def __new__(cls, iterable, start=0):
new_enumerate = _coconut.enumerate.__new__(cls, iterable, start)
new_enumerate.iter = iterable
new_enumerate.start = start
return new_enumerate
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice):
return self.__class__(_coconut_igetitem(self.iter, index), self.start + (0 if index.start is None else index.start if index.start >= 0 else len(self.iter) + index.start))
return (self.start + index, _coconut_igetitem(self.iter, index))
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "enumerate(%r, %r)" % (self.iter, self.start)
def __reduce__(self):
return (self.__class__, (self.iter, self.start))
def __reduce_ex__(self, _):
return self.__reduce__()
def __copy__(self):
return self.__class__(_coconut.copy.copy(self.iter), self.start)
def __fmap__(self, func):
return _coconut_map(func, self)
class count:
"""count(start, step) returns an infinite iterator starting at start and increasing by step.
If step is set to 0, count will infinitely repeat its first argument."""
__slots__ = ("start", "step")
def __init__(self, start=0, step=1):
self.start = start
self.step = step
def __iter__(self):
while True:
yield self.start
if self.step:
self.start += self.step
def __contains__(self, elem):
if not self.step:
return elem == self.start
if elem < self.start:
return False
return (elem - self.start) % self.step == 0
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice) and (index.start is None or index.start >= 0) and (index.stop is None or index.stop >= 0):
new_start, new_step = self.start, self.step
if self.step and index.start is not None:
new_start += self.step * index.start
if self.step and index.step is not None:
new_step *= index.step
if index.stop is None:
return self.__class__(new_start, new_step)
if self.step and _coconut.isinstance(self.start, _coconut.int) and _coconut.isinstance(self.step, _coconut.int):
return _coconut.range(new_start, self.start + self.step * index.stop, new_step)
return _coconut_map(self.__getitem__, _coconut.range(index.start if index.start is not None else 0, index.stop, index.step if index.step is not None else 1))
if index < 0:
raise _coconut.IndexError("count indices must be positive")
return self.start + self.step * index if self.step else self.start
def count(self, elem):
"""Count the number of times elem appears in the count."""
if not self.step:
return _coconut.float("inf") if elem == self.start else 0
return int(elem in self)
def index(self, elem):
"""Find the index of elem in the count."""
if elem not in self:
raise _coconut.ValueError(_coconut.repr(elem) + " not in " + _coconut.repr(self))
return (elem - self.start) // self.step if self.step else 0
def __reversed__(self):
if not self.step:
return self
raise _coconut.TypeError(repr(self) + " object is not reversible")
def __repr__(self):
return "count(%r, %r)" % (self.start, self.step)
def __hash__(self):
return _coconut.hash((self.start, self.step))
def __reduce__(self):
return (self.__class__, (self.start, self.step))
def __copy__(self):
return self.__class__(self.start, self.step)
def __eq__(self, other):
return isinstance(other, self.__class__) and self.start == other.start and self.step == other.step
def __fmap__(self, func):
return _coconut_map(func, self)
class groupsof:
"""groupsof(n, iterable) splits iterable into groups of size n.
If the length of the iterable is not divisible by n, the last group may be of size < n."""
__slots__ = ("group_size", "iter")
def __init__(self, n, iterable):
self.iter = iterable
try:
self.group_size = _coconut.int(n)
except _coconut.ValueError:
raise _coconut.TypeError("group size must be an int; not %r" % (n,))
if self.group_size <= 0:
raise _coconut.ValueError("group size must be > 0; not %r" % (self.group_size,))
def __iter__(self):
iterator = _coconut.iter(self.iter)
loop = True
while loop:
group = []
for _ in _coconut.range(self.group_size):
try:
group.append(_coconut.next(iterator))
except _coconut.StopIteration:
loop = False
break
if group:
yield _coconut.tuple(group)
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "groupsof(%r)" % (self.iter,)
def __reduce__(self):
return (self.__class__, (self.group_size, self.iter))
def __copy__(self):
return self.__class__(self.group_size, _coconut.copy.copy(self.iter))
def __fmap__(self, func):
return _coconut_map(func, self)
class recursive_iterator:
"""Decorator that optimizes a function for iterator recursion."""
__slots__ = ("func", "tee_store", "backup_tee_store")
def __init__(self, func):
self.func = func
self.tee_store = {}
self.backup_tee_store = []
def __call__(self, *args, **kwargs):
key = (args, _coconut.frozenset(kwargs))
use_backup = False
try:
hash(key)
except _coconut.Exception:
try:
key = _coconut.pickle.dumps(key, -1)
except _coconut.Exception:
use_backup = True
if use_backup:
for i, (k, v) in _coconut.enumerate(self.backup_tee_store):
if k == key:
to_tee, store_pos = v, i
break
else: # no break
to_tee = self.func(*args, **kwargs)
store_pos = None
to_store, to_return = _coconut_tee(to_tee)
if store_pos is None:
self.backup_tee_store.append([key, to_store])
else:
self.backup_tee_store[store_pos][1] = to_store
else:
self.tee_store[key], to_return = _coconut_tee(self.tee_store.get(key) or self.func(*args, **kwargs))
return to_return
def __repr__(self):
return "@recursive_iterator(" + _coconut.repr(self.func) + ")"
def __reduce__(self):
return (self.__class__, (self.func,))
def __get__(self, obj, objtype=None):
return _coconut.functools.partial(self, obj)
class _coconut_FunctionMatchErrorContext(object):
__slots__ = ('exc_class', 'taken')
threadlocal_var = _coconut.threading.local()
def __init__(self, exc_class):
self.exc_class = exc_class
self.taken = False
def __enter__(self):
try:
self.threadlocal_var.contexts.append(self)
except _coconut.AttributeError:
self.threadlocal_var.contexts = [self]
def __exit__(self, type, value, traceback):
self.threadlocal_var.contexts.pop()
@classmethod
def get(cls):
try:
ctx = cls.threadlocal_var.contexts[-1]
except (_coconut.AttributeError, _coconut.IndexError):
return _coconut_MatchError
if not ctx.taken:
ctx.taken = True
return ctx.exc_class
return _coconut_MatchError
_coconut_get_function_match_error = _coconut_FunctionMatchErrorContext.get
class _coconut_base_pattern_func:
__slots__ = ("FunctionMatchError", "__doc__", "patterns")
_coconut_is_match = True
def __init__(self, *funcs):
self.FunctionMatchError = _coconut.type(_coconut_str("MatchError"), (_coconut_MatchError,), {})
self.__doc__ = None
self.patterns = []
for func in funcs:
self.add(func)
def add(self, func):
self.__doc__ = _coconut.getattr(func, "__doc__", None) or self.__doc__
if _coconut.isinstance(func, _coconut_base_pattern_func):
self.patterns += func.patterns
else:
self.patterns.append(func)
def __call__(self, *args, **kwargs):
for func in self.patterns[:-1]:
try:
with _coconut_FunctionMatchErrorContext(self.FunctionMatchError):
return func(*args, **kwargs)
except self.FunctionMatchError:
pass
return self.patterns[-1](*args, **kwargs)
def _coconut_tco_func(self, *args, **kwargs):
for func in self.patterns[:-1]:
try:
with _coconut_FunctionMatchErrorContext(self.FunctionMatchError):
return func(*args, **kwargs)
except self.FunctionMatchError:
pass
return _coconut_tail_call(self.patterns[-1], *args, **kwargs)
def __repr__(self):
return "addpattern(" + _coconut.repr(self.patterns[0]) + ")(*" + _coconut.repr(self.patterns[1:]) + ")"
def __reduce__(self):
return (self.__class__, _coconut.tuple(self.patterns))
def __get__(self, obj, objtype=None):
if obj is None:
return self
return _coconut.functools.partial(self, obj)
def _coconut_mark_as_match(base_func):
base_func._coconut_is_match = True
return base_func
def addpattern(base_func, **kwargs):
"""Decorator to add a new case to a pattern-matching function,
where the new case is checked last."""
allow_any_func = kwargs.pop("allow_any_func", False)
if not allow_any_func and not _coconut.getattr(base_func, "_coconut_is_match", False):
_coconut.warnings.warn("Possible misuse of addpattern with non-pattern-matching function " + _coconut.repr(base_func) + " (pass allow_any_func=True to dismiss)", stacklevel=2)
if kwargs:
raise _coconut.TypeError("addpattern() got unexpected keyword arguments " + _coconut.repr(kwargs))
return _coconut.functools.partial(_coconut_base_pattern_func, base_func)
_coconut_addpattern = addpattern
def prepattern(base_func, **kwargs):
"""DEPRECATED: Use addpattern instead."""
def pattern_prepender(func):
return addpattern(func, **kwargs)(base_func)
return pattern_prepender
class _coconut_partial:
__slots__ = ("func", "_argdict", "_arglen", "_stargs", "keywords")
if hasattr(_coconut.functools.partial, "__doc__"):
__doc__ = _coconut.functools.partial.__doc__
def __init__(self, func, argdict, arglen, *args, **kwargs):
self.func = func
self._argdict = argdict
self._arglen = arglen
self._stargs = args
self.keywords = kwargs
def __reduce__(self):
return (self.__class__, (self.func, self._argdict, self._arglen) + self._stargs, self.keywords)
def __setstate__(self, keywords):
self.keywords = keywords
@property
def args(self):
return _coconut.tuple(self._argdict.get(i) for i in _coconut.range(self._arglen)) + self._stargs
def __call__(self, *args, **kwargs):
callargs = []
argind = 0
for i in _coconut.range(self._arglen):
if i in self._argdict:
callargs.append(self._argdict[i])
elif argind >= _coconut.len(args):
raise _coconut.TypeError("expected at least " + _coconut.str(self._arglen - _coconut.len(self._argdict)) + " argument(s) to " + _coconut.repr(self))
else:
callargs.append(args[argind])
argind += 1
callargs += self._stargs
callargs += args[argind:]
kwargs.update(self.keywords)
return self.func(*callargs, **kwargs)
def __repr__(self):
args = []
for i in _coconut.range(self._arglen):
if i in self._argdict:
args.append(_coconut.repr(self._argdict[i]))
else:
args.append("?")
for arg in self._stargs:
args.append(_coconut.repr(arg))
return _coconut.repr(self.func) + "$(" + ", ".join(args) + ")"
def consume(iterable, keep_last=0):
"""consume(iterable, keep_last) fully exhausts iterable and return the last keep_last elements."""
return _coconut.collections.deque(iterable, maxlen=keep_last)
class starmap(_coconut.itertools.starmap):
__slots__ = ("func", "iter")
if hasattr(_coconut.itertools.starmap, "__doc__"):
__doc__ = _coconut.itertools.starmap.__doc__
def __new__(cls, function, iterable):
new_map = _coconut.itertools.starmap.__new__(cls, function, iterable)
new_map.func = function
new_map.iter = iterable
return new_map
def __getitem__(self, index):
if _coconut.isinstance(index, _coconut.slice):
return self.__class__(self.func, _coconut_igetitem(self.iter, index))
return self.func(*_coconut_igetitem(self.iter, index))
def __reversed__(self):
return self.__class__(self.func, *_coconut_reversed(self.iter))
def __len__(self):
return _coconut.len(self.iter)
def __repr__(self):
return "starmap(%r, %r)" % (self.func, self.iter)
def __reduce__(self):
return (self.__class__, (self.func, self.iter))
def __reduce_ex__(self, _):
return self.__reduce__()
def __copy__(self):
return self.__class__(self.func, _coconut.copy.copy(self.iter))
def __fmap__(self, func):
return self.__class__(_coconut_forward_compose(self.func, func), self.iter)
def makedata(data_type, *args):
"""Construct an object of the given data_type containing the given arguments."""
if _coconut.hasattr(data_type, "_make") and _coconut.issubclass(data_type, _coconut.tuple):
return data_type._make(args)
if _coconut.issubclass(data_type, (_coconut.map, _coconut.range, _coconut.abc.Iterator)):
return args
if _coconut.issubclass(data_type, _coconut.str):
return "".join(args)
return data_type(args)
def datamaker(data_type):
"""DEPRECATED: Use makedata instead."""
return _coconut.functools.partial(makedata, data_type)
def fmap(func, obj):
"""fmap(func, obj) creates a copy of obj with func applied to its contents.
Override by defining obj.__fmap__(func)."""
if _coconut.hasattr(obj, "__fmap__"):
return obj.__fmap__(func)
if obj.__class__.__module__ == "numpy":
from numpy import vectorize
return vectorize(func)(obj)
return _coconut_makedata(obj.__class__, *(_coconut_starmap(func, obj.items()) if _coconut.isinstance(obj, _coconut.abc.Mapping) else _coconut_map(func, obj)))
def memoize(maxsize=None, *args, **kwargs):
"""Decorator that memoizes a function,
preventing it from being recomputed if it is called multiple times with the same arguments."""
return _coconut.functools.lru_cache(maxsize, *args, **kwargs)
_coconut_MatchError, _coconut_count, _coconut_enumerate, _coconut_makedata, _coconut_map, _coconut_reversed, _coconut_starmap, _coconut_tee, _coconut_zip, TYPE_CHECKING, reduce, takewhile, dropwhile = MatchError, count, enumerate, makedata, map, reversed, starmap, tee, zip, False, _coconut.functools.reduce, _coconut.itertools.takewhile, _coconut.itertools.dropwhile
# Compiled Coconut: -----------------------------------------------------------
from enum import Enum
from typing import Optional
from typing import Type
import os
import lmdb
from legdb import Database
from legdb import Entity
from legdb import Node
from legdb import Edge
from legdb.step import SourceStep
from legdb.step import HasStep
from legdb.step import EdgeInStep
from legdb.step import EdgeOutStep
from legdb.step import EdgeAllStep
from legdb.step import PynndbFilterStep
from legdb.step import PynndbEdgeInStep
from legdb.step import PynndbEdgeOutStep
from legdb.step import PynndbEdgeAllStep
from legdb.step import PynndbUnionStep
class EdgeType(Enum):
IN = "in"
OUT = "out"
class StepBuilder:
def __init__(self, database: 'Optional[Database]'=None, node_cls: 'Type[Entity]'=Node, edge_cls: 'Type[Entity]'=Edge, page_size: 'int'=65536, txn: 'Optional[lmdb.Transaction]'=None,) -> 'None':
self._compiled_steps = []
self._database = database
self._edge_cls = edge_cls
self._is_compiled = False
self._node_cls = node_cls
self._page_size = page_size
self._steps = []
self._txn = txn
def source(self, what: 'Type[Entity]') -> 'StepBuilder':
if self._steps:
raise ValueError("Step 'source' should be the first.")
self._steps.append(SourceStep(what=what))
return self
def has(self, **kwargs) -> 'StepBuilder':
self._steps.append(HasStep(**kwargs))
return self
def edge_in(self, **kwargs) -> 'StepBuilder':
self._steps.append(EdgeInStep(**kwargs))
return self
def edge_out(self, **kwargs) -> 'StepBuilder':
self._steps.append(EdgeOutStep(**kwargs))
return self
def edge_all(self, **kwargs) -> 'StepBuilder':
self._steps.append(EdgeAllStep(**kwargs))
return self
@_coconut_tco
def __repr__(self) -> 'str':
return _coconut_tail_call(".".join, (repr(step) for step in self._steps))
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) <= 1) and (_coconut.sum((_coconut.len(_coconut_match_to_args) > 0, "self" in _coconut_match_to_kwargs)) == 1):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'match def _compile(self):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'match def _compile(self):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
raise NotImplementedError()
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 1) and (_coconut.isinstance(_coconut_match_to_args[1][0], SourceStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step = _coconut_match_to_args[1][0]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step is SourceStep, )):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step is SourceStep, )):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return True, [PynndbFilterStep(database=self._database, what=step.what, attrs={}, page_size=self._page_size, txn=self._txn)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 1) and (_coconut.isinstance(_coconut_match_to_args[1][0], EdgeInStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step = _coconut_match_to_args[1][0]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step is EdgeInStep, )):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step is EdgeInStep, )):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return True, [PynndbEdgeInStep(database=self._database, what=self._edge_cls, attrs=step.attrs, page_size=self._page_size, txn=self._txn)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 1) and (_coconut.isinstance(_coconut_match_to_args[1][0], EdgeOutStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step = _coconut_match_to_args[1][0]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step is EdgeOutStep, )):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step is EdgeOutStep, )):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return True, [PynndbEdgeOutStep(database=self._database, what=self._edge_cls, attrs=step.attrs, page_size=self._page_size, txn=self._txn)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 1) and (_coconut.isinstance(_coconut_match_to_args[1][0], EdgeAllStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step = _coconut_match_to_args[1][0]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step is EdgeAllStep, )):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step is EdgeAllStep, )):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return True, [PynndbEdgeAllStep(database=self._database, what=self._edge_cls, attrs=step.attrs, page_size=self._page_size, txn=self._txn)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 2) and (_coconut.isinstance(_coconut_match_to_args[1][0], PynndbFilterStep)) and (_coconut.isinstance(_coconut_match_to_args[1][1], HasStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step0 = _coconut_match_to_args[1][0]
step1 = _coconut_match_to_args[1][1]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is HasStep)):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is HasStep)):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
attrs = {**step0.attrs, **step1.attrs}
return False, [PynndbFilterStep(database=self._database, what=step0.what, attrs=attrs, page_size=self._page_size, txn=self._txn)]
@_coconut_tco
def _make_optimized_pynndb_edge_step(self, step0: 'PynndbStep', step1: 'PynndbStep', edge_type: 'EdgeType') -> 'PynndbFilterStep':
attrs_key = "end" if edge_type == EdgeType.IN else "start"
return _coconut_tail_call(PynndbFilterStep, database=self._database, what=step0.what._edge_class, attrs={attrs_key: step0.attrs, **step1.attrs}, page_size=self._page_size, txn=self._txn)
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 2) and (_coconut.isinstance(_coconut_match_to_args[1][0], PynndbFilterStep)) and (_coconut.isinstance(_coconut_match_to_args[1][1], PynndbEdgeInStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step0 = _coconut_match_to_args[1][0]
step1 = _coconut_match_to_args[1][1]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if _coconut_match_check and not ((issubclass)(step0.what, Node)):
_coconut_match_check = False
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeInStep) if step0.what `issubclass` Node):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeInStep) if step0.what `issubclass` Node):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return False, [self._make_optimized_pynndb_edge_step(step0, step1, EdgeType.IN)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 2) and (_coconut.isinstance(_coconut_match_to_args[1][0], PynndbFilterStep)) and (_coconut.isinstance(_coconut_match_to_args[1][1], PynndbEdgeOutStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step0 = _coconut_match_to_args[1][0]
step1 = _coconut_match_to_args[1][1]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if _coconut_match_check and not ((issubclass)(step0.what, Node)):
_coconut_match_check = False
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeOutStep) if step0.what `issubclass` Node):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeOutStep) if step0.what `issubclass` Node):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return False, [self._make_optimized_pynndb_edge_step(step0, step1, EdgeType.OUT)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) == 2) and ("self" not in _coconut_match_to_kwargs) and (_coconut.isinstance(_coconut_match_to_args[1], _coconut.abc.Sequence)) and (_coconut.len(_coconut_match_to_args[1]) == 2) and (_coconut.isinstance(_coconut_match_to_args[1][0], PynndbFilterStep)) and (_coconut.isinstance(_coconut_match_to_args[1][1], PynndbEdgeAllStep)):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
step0 = _coconut_match_to_args[1][0]
step1 = _coconut_match_to_args[1][1]
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
_coconut_match_check = True
if _coconut_match_check and not ((issubclass)(step0.what, Node)):
_coconut_match_check = False
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeAllStep) if step0.what `issubclass` Node):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, (step0 is PynndbFilterStep, step1 is PynndbEdgeAllStep) if step0.what `issubclass` Node):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return False, [PynndbUnionStep(database=self._database, page_size=self._page_size, steps=[self._make_optimized_pynndb_edge_step(step0, step1, EdgeType.IN), self._make_optimized_pynndb_edge_step(step0, step1, EdgeType.OUT),], txn=self._txn)]
@_coconut_addpattern(_compile)
@_coconut_mark_as_match
def _compile(*_coconut_match_to_args, **_coconut_match_to_kwargs):
_coconut_match_check = False
_coconut_FunctionMatchError = _coconut_get_function_match_error()
if (_coconut.len(_coconut_match_to_args) <= 2) and (_coconut.sum((_coconut.len(_coconut_match_to_args) > 0, "self" in _coconut_match_to_kwargs)) == 1) and (_coconut.sum((_coconut.len(_coconut_match_to_args) > 1, "steps" in _coconut_match_to_kwargs)) == 1):
_coconut_match_temp_0 = _coconut_match_to_args[0] if _coconut.len(_coconut_match_to_args) > 0 else _coconut_match_to_kwargs.pop("self")
_coconut_match_temp_1 = _coconut_match_to_args[1] if _coconut.len(_coconut_match_to_args) > 1 else _coconut_match_to_kwargs.pop("steps")
if not _coconut_match_to_kwargs:
self = _coconut_match_temp_0
steps = _coconut_match_temp_1
_coconut_match_check = True
if not _coconut_match_check:
_coconut_match_val_repr = _coconut.repr(_coconut_match_to_args)
_coconut_match_err = _coconut_FunctionMatchError("pattern-matching failed for " "'addpattern def _compile(self, steps):'" " in " + (_coconut_match_val_repr if _coconut.len(_coconut_match_val_repr) <= 500 else _coconut_match_val_repr[:500] + "..."))
_coconut_match_err.pattern = 'addpattern def _compile(self, steps):'
_coconut_match_err.value = _coconut_match_to_args
raise _coconut_match_err
return True, steps
def _compile_all(self) -> 'None':
if self._is_compiled:
return
self._compiled_steps = []
max_step_count_for_compiled_step = 2
steps = self._steps
for step_count in range(1, max_step_count_for_compiled_step + 1):
i = 0
j = i + step_count
while i < len(steps):
steps_window = steps[i:j]
go_forward, new_steps = self._compile(steps_window)
steps = steps[:i] + new_steps + steps[j:]
if go_forward:
i += 1
j += 1
self._compiled_steps = steps
self._is_compiled = True
def __iter__(self):
self._compile_all()
last = len(self._compiled_steps) - 1
exhausted = False
entities = None
i = 0
while not exhausted:
step = self._compiled_steps[i]
if i > 0 and entities:
step.input(entities)
entities = step.output()
if not entities:
i -= 1
if i < 0:
exhausted = True
else:
if i < last:
i += 1
self._compiled_steps[i].reset_iter()
else:
yield from entities
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.