Skip to content

Instantly share code, notes, and snippets.

@jrydberg
Created November 13, 2012 07:05
Show Gist options
  • Save jrydberg/4064426 to your computer and use it in GitHub Desktop.
Save jrydberg/4064426 to your computer and use it in GitHub Desktop.
Functional-programming style sequences.
"""Functional-programming style sequences."""
import itertools
import functools
import operator as op
class Blank(object):
def __init__(self, pos=None):
self.pos = pos
# Our "blank" objects
_1, _2, _3, _4 = Blank(0), Blank(1), Blank(2), Blank(3)
class BlankedInvocation(object):
def __init__(self, fn, posargs, kwargs):
self.fn = fn
self.posargs = posargs
self.kwargs = kwargs
def padnone(self, iterable):
"""Returns the sequence elements and then returns None indefinitely.
Useful for emulating the behavior of the built-in map() function.
"""
return itertools.chain(iterable, itertools.repeat(None))
def __call__(self, *values):
values = list(itertools.islice(self.padnone(values),
0, len(self.posargs)))
args = []
for arg in self.posargs:
if isinstance(arg, Blank):
args.append(values[arg.pos])
else:
args.append(arg)
return self.fn(*args, **self.kwargs)
def fnliteral(*args, **kw):
"""Create a function literal based on the provided arguments.
Will return a callable with signature C{(value)}.
This acts like L{functools.partial} in some ways, except that
it allows the value to be placed at any position. Example:
>>> fnliteral(operator.add, _, 20)(10)
30
"""
if len(args) == 0:
return lambda x: x if x else None
if len(args) == 1:
return lambda *a: args[0](*a, **kw)
else:
if any(isinstance(arg, Blank) for arg in args[1:]):
return BlankedInvocation(args[0], args[1:], kw)
else:
return functools.partial(*args, **kw)
class _BaseSeq:
"""Base class of our sequences that expects that C{self} is an
iterable of some form.
"""
def take(self, n):
return self.slice(None, n)
def takewhile(self, *args, **kw):
predicate = fnliteral(*args, **kw)
return self.__class__(itertools.takewhile(predicate, self))
def some(self, *args, **kw):
predicate = fnliteral(*args, **kw)
for value in self:
if predicate(value):
return value
return None
def filter(self, *args, **kw):
fn = fnliteral(*args, **kw)
return self.__class__(elem for elem in self if fn(elem))
def filterfalse(self, *args, **kw):
fn = fnliteral(*args, **kw)
return self.__class__(elem for elem in self if not fn(elem))
def map(self, *args, **kw):
fn = fnliteral(*args, **kw)
return self.__class__(fn(elem) for elem in self)
def concat(self, *other):
return self.__class__(itertools.chain(self, *other))
def count(self, fn=bool):
return sum(1 for elem in self if fn(elem))
def reduce(self, *args, **kw):
"""Reduce content of sequence down to a single value.
"""
fn = fnliteral(*args, **kw)
it = iter(self)
ret = next(it)
for value in it:
ret = fn(ret, value)
return ret
def first(self):
"""Return the first element or None."""
return next(self, None)
def dropwhile(self, *args, **kw):
predicate = fnliteral(*args, **kw)
return self.__class__(itertools.dropwhile(predicate, self))
def slice(self, *args):
return self.__class__(itertools.islice(self, *args))
def zip(self, *iterables):
return self.__class__(itertools.izip(self, *iterables))
def enumerate(self):
return self.__class__(enumerate(self))
def append(self, elem):
return self.__class__(itertools.chain(self, [elem]))
def prepend(self, elem):
return self.__class__(itertools.chain([elem], self))
def rest(self):
"""Return a sequence of all but the first element."""
it = iter(self)
try:
next(it)
except StopIteration:
it = iter([])
return self.__class__(it)
def nth(self, n, default=None):
"""Returns the nth element or a default value"""
return next(itertools.islice(self, n, None), default)
def reverse(self):
"""Return reversed sequence."""
return self.__class__(reversed(self))
def remove(self, toremove):
def produce():
it = iter(self)
for elem in it:
if elem == toremove:
break
yield elem
for elem in it:
yield elem
return self.__class__(produce())
def flatten(self):
"""Flatten one level of nesting."""
return self.__class__(itertools.chain.from_iterable(self))
def sort(self, cmp=None, key=None, reverse=False):
"""Return sorted sequence."""
return self.__class__(sorted(self, cmp=cmp, key=key,
reverse=reverse))
def do(self, *args, **kw):
fn = fnliteral(*args, **kw)
for value in self:
fn(value)
class lazy_seq(object, _BaseSeq):
"""A lazy sequence, that operate on iterable.
Note that this sequence is "one shot" in the sense that it
is only possible to iterate over it once.
If you want to make the content permanent, use the C{save} method.
"""
def __init__(self, iterable):
self._iterable = iter(iterable)
def __iter__(self):
return self._iterable
def __repr__(self):
return 'lazy_seq(...)'
def tee(self, n=2):
return map(lazy_seq, itertools.tee(self, n))
def save(self):
return seq(self)
seq = persist = save
class seq(_BaseSeq, list):
"""An immutable sequence of elements."""
def __getitem__(self, elem):
if type(elem) == slice:
return seq(list(self)[elem])
return list.__getitem__(self, elem)
def __setitem__(self, elem):
raise TypeError
def insert(self, index, elem):
# FIXME: do this more elegantly
data = list(self)
data.insert(index, elem)
return seq(data)
def __repr__(self):
return 'seq(%r)' % (list(self),)
def test():
print (lazy_seq(itertools.count(1))
.slice(0, 2)
.prepend(20)
.rest()
.map(op.add, _1, _1) # x + x
.map(op.div, _1, 0.5) # x / 0.5
.map(int)
).seq().reduce(op.add, _1, _2)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment