Skip to content

Instantly share code, notes, and snippets.

@badocelot
Created March 21, 2018 07:58
Show Gist options
  • Save badocelot/a2f4a50ef8498230911194634fcae0c1 to your computer and use it in GitHub Desktop.
Save badocelot/a2f4a50ef8498230911194634fcae0c1 to your computer and use it in GitHub Desktop.
Streams with a Python twist
import functools, itertools
class StreamIterator:
"""
Iterator class for a Stream
This class manages the cache for a regular Stream; it is unnecessary for a
ContainerStream.
"""
def __init__(self, source, cache):
self.__source = source
self.__cache = cache
self.__index = 0
def __iter__(self):
return self
def __next__(self):
if self.__index < len(self.__cache):
# if there's more left in cache, use it
result = self.__cache[self.__index]
else:
# otherwise, take a new item off source and cache it
result = next(self.__source)
self.__cache.append(result)
# if not at the end, move up; if at the end, keep it there
self.__index += 1
return result
class Stream:
"""Stream wrapping around an iterable, providing convenient methods."""
def __init__(self, iterable):
"""Creates a Stream from the iterable"""
self.__source = iter(iterable)
self.__cache = []
def __add__(self, iterable):
"""
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of this Stream first
"""
return Stream(itertools.chain(self, iterable))
def __iter__(self):
"""Returns a StreamIterator for this Stream"""
return StreamIterator(self.__source, self.__cache)
def __mul__(self, n):
"""
Returns a Stream that repeats the contents of this Stream `n` times
WARNING
-------
Do not use this operator on infinite Streams!
"""
return self.repeat(n)
def __radd__(self, iterable):
"""
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of the iterable first
"""
return Stream(itertools.chain(iterable, self))
def __rmul__(self, n):
"""
Returns a Stream that repeats the contents of this Stream `n` times
WARNING
-------
Do not use this operator on infinite Streams!
"""
return self.repeat(n)
def __len__(self):
"""
Returns the number of items in this Stream
WARNING
-------
Do not use len() on infinite Streams!
"""
return sum(1 for _ in self)
def all(self, pred = True):
"""
Returns whether all items of this Stream match the given predicate
If no predicate is provided, returns whether all items in this Stream
are truthy, as if `bool` were the predicate
WARNING
-------
Do no use this method on infinite Streams!
"""
if pred is None:
return all(self)
else:
return all(pred(x) for x in self)
def any(self, pred = None):
"""
Returns whether this Stream has any items matching the given predicate
If no predicate is provided, returns whether this Stream has any items
at all
"""
if pred is None:
return self.at_least(1)
else:
return any(pred(x) for x in self)
def at_least(self, how_many):
"""
Returns whether this Stream has at least `how_many` items
"""
try:
iterator = iter(self)
for i in range(how_many):
next(iterator)
return True
except StopIteration:
return False
def chain(self, iterable):
"""
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of this Stream first
Equivalent to self + iterable
"""
return self + iterable
def count(self, pred = None):
"""
Returns the number of items matching a specific predicate
Returns the overall number of items if no predicate is given
Example Usage
-------------
>>> st = stream([1,2,3,4,5])
>>> st.count(lambda x: x % 2 == 0) # => 2
>>> st.count() # => 5
"""
if pred is None:
return len(self)
else:
return sum(1 for x in self if pred(x))
def drop(self, how_many):
"""
Returns a Stream with the items of this Stream, except the first
`how_many`
"""
def inner(iterable, how_many: int):
for i in range(how_many):
next(iterable)
for item in iterable:
yield item
return Stream(inner(iter(self), how_many))
def empty(self):
"""
Returns whether this Stream has no items
Equivalent to `not self.any()`
"""
return not self.any()
def filter(self, pred):
"""
Returns a Stream that containing only the items in this Stream that
meet the given predicate (i.e. cause it to return True)
Example Usage
-------------
>>> st = stream([1,2,3,4,5])
>>> st.filter(lambda x: x % 2 == 0) # => Stream with 2, 4
"""
return Stream(filter(pred, self))
def first(self, pred = None):
"""
Returns the first item in this Stream matching the given predicate
If no predicate is provided, returns the first item in this Stream
"""
if pred is None:
return next(iter(self))
else:
return next(x for x in self if pred(x))
def foldr(self, func, initial):
"""
Returns the result of performing the fold-right operation on this
Stream with the given function and initial value.
WARNING
-------
Do not use this method on infinite Streams!
"""
return functools.reduce(lambda x,y: func(y,x), reversed(self), initial)
def last(self, pred = None):
"""
Returns the last item in this Stream that matches the given predicate
If no predicate is provided, returns the last item in this Stream
WARNING
-------
Do not use this method on infinite Streams!
"""
return self.reversed().first(pred)
def list(self):
"""
Returns the items of this Stream in a list
Equivalent to `self.to(list)`
WARNING
-------
Do not use this method on infinite Streams!
"""
return list(self)
def map(self, func):
"""
Returns a Stream containing the results of calling func() with each
item in this stream, similar to map()
Example Usage
-------------
>>> st = stream([1,2,3,4,5])
>>> st.map(lambda x: x * x) # => Stream with 1, 4, 9, 16, 25
>>> st.map(float) # => Stream with 1.0, 2.0, 3.0, 4.0, 5.0
"""
return Stream(map(func, self))
def reduce(self, func, initial):
"""
Returns the result of the reduce operation using the given function and
initial value
"""
return functools.reduce(func, self, initial)
def repeat(self, n = 2):
"""
Returns a Stream that repeats the contents of this Stream `n` times
WARNING
-------
Do not use this method on infinite Streams!
"""
return Stream(itertools.chain.from_iterable(self.tee(n)))
def reversed(self):
"""
Returns a reversed version of this Stream.
WARNING
-------
Do not use this method on infinite Streams!
"""
return Stream(reversed(self))
def set(self):
"""
Returns the items of this Stream in a set
Equivalent to `self.to(set)`
WARNING
-------
Do not use this method on infinite Streams!
"""
return set(self)
def sort(self, key = None):
"""
Returns a sorted version of this Stream, using `key` as a key function
WARNING
-------
Do not use this method on infinite Streams!
"""
return Stream(sorted(self, key=key))
def take(self, num):
"""Returns a Stream containing the first `num` items of this Stream"""
iterator = iter(self)
return Stream(next(iterator) for _ in range(num))
def tee(self, n = 2):
"""
Returns a tuple containing `n` duplicates of this Stream
WARNING
-------
Do not use this on infinite Streams!
"""
return tuple(Stream(t) for t in itertools.tee(self, n))
def to(self, type_):
"""
Returns the Stream converted to `type_`
`type_` must be a type constructor that takes an iterable
Example Usage
-------------
>>> st = stream([1,2,3,4,5])
>>> st.to(list) # => [1, 2, 3, 4, 5]
>>> st.to(tuple) # => (1, 2, 3, 4, 5)
>>> st.to(set) # => {1, 2, 3, 4, 5}
>>> st.map(float).to(list) # => [1.0, 2.0, 3.0, 4.0, 5.0]
WARNING
-------
Do not use this on infinite Streams, unless the type constructor knows
how to deal with them.
"""
return type_(self)
def tuple(self):
"""
Returns the items of this Stream in a tuple
Equivalent to `self.to(tuple)`
WARNING
-------
Do not use this method on infinite Streams!
"""
return tuple(self)
class ContainerStream(Stream):
"""
Stream wrapping around a finite iterable container
Does no caching, unlike regular Stream; assumes that calling iter() on the
provided container will always return a fresh iterator with all items in
the container
"""
def __init__(self, container):
"""Creates a Stream wrapping around the container"""
self.__container = container
def __iter__(self):
"""Returns the iterator for this Stream"""
return iter(self.__container)
def stream(iterable, use_container = None):
"""
Returns a Stream for the iterable
Will use ContainerStream if `use_container` is True
If `use_container` is not provided, built-in container types will be
automatically upgraded to ContainerStream; to prevent this, pass
`use_container` as False
"""
builtins = (dict, list, set, tuple)
return_container = use_container or \
use_container is None and type(iterable) in builtins
return ContainerStream(iterable) if return_container else Stream(iterable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment