Skip to content

Instantly share code, notes, and snippets.

Created March 21, 2018 07:58
Show Gist options
  • Save badocelot/a2f4a50ef8498230911194634fcae0c1 to your computer and use it in GitHub Desktop.
Save badocelot/a2f4a50ef8498230911194634fcae0c1 to your computer and use it in GitHub Desktop.
Streams with a Python twist
import functools, itertools
class StreamIterator:
Iterator class for a Stream
This class manages the cache for a regular Stream; it is unnecessary for a
def __init__(self, source, cache):
self.__source = source
self.__cache = cache
self.__index = 0
def __iter__(self):
return self
def __next__(self):
if self.__index < len(self.__cache):
# if there's more left in cache, use it
result = self.__cache[self.__index]
# otherwise, take a new item off source and cache it
result = next(self.__source)
# if not at the end, move up; if at the end, keep it there
self.__index += 1
return result
class Stream:
"""Stream wrapping around an iterable, providing convenient methods."""
def __init__(self, iterable):
"""Creates a Stream from the iterable"""
self.__source = iter(iterable)
self.__cache = []
def __add__(self, iterable):
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of this Stream first
return Stream(itertools.chain(self, iterable))
def __iter__(self):
"""Returns a StreamIterator for this Stream"""
return StreamIterator(self.__source, self.__cache)
def __mul__(self, n):
Returns a Stream that repeats the contents of this Stream `n` times
Do not use this operator on infinite Streams!
return self.repeat(n)
def __radd__(self, iterable):
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of the iterable first
return Stream(itertools.chain(iterable, self))
def __rmul__(self, n):
Returns a Stream that repeats the contents of this Stream `n` times
Do not use this operator on infinite Streams!
return self.repeat(n)
def __len__(self):
Returns the number of items in this Stream
Do not use len() on infinite Streams!
return sum(1 for _ in self)
def all(self, pred = True):
Returns whether all items of this Stream match the given predicate
If no predicate is provided, returns whether all items in this Stream
are truthy, as if `bool` were the predicate
Do no use this method on infinite Streams!
if pred is None:
return all(self)
return all(pred(x) for x in self)
def any(self, pred = None):
Returns whether this Stream has any items matching the given predicate
If no predicate is provided, returns whether this Stream has any items
at all
if pred is None:
return self.at_least(1)
return any(pred(x) for x in self)
def at_least(self, how_many):
Returns whether this Stream has at least `how_many` items
iterator = iter(self)
for i in range(how_many):
return True
except StopIteration:
return False
def chain(self, iterable):
Returns a Stream that represents the concatenation of this Stream
with another iterable, with the items of this Stream first
Equivalent to self + iterable
return self + iterable
def count(self, pred = None):
Returns the number of items matching a specific predicate
Returns the overall number of items if no predicate is given
Example Usage
>>> st = stream([1,2,3,4,5])
>>> st.count(lambda x: x % 2 == 0) # => 2
>>> st.count() # => 5
if pred is None:
return len(self)
return sum(1 for x in self if pred(x))
def drop(self, how_many):
Returns a Stream with the items of this Stream, except the first
def inner(iterable, how_many: int):
for i in range(how_many):
for item in iterable:
yield item
return Stream(inner(iter(self), how_many))
def empty(self):
Returns whether this Stream has no items
Equivalent to `not self.any()`
return not self.any()
def filter(self, pred):
Returns a Stream that containing only the items in this Stream that
meet the given predicate (i.e. cause it to return True)
Example Usage
>>> st = stream([1,2,3,4,5])
>>> st.filter(lambda x: x % 2 == 0) # => Stream with 2, 4
return Stream(filter(pred, self))
def first(self, pred = None):
Returns the first item in this Stream matching the given predicate
If no predicate is provided, returns the first item in this Stream
if pred is None:
return next(iter(self))
return next(x for x in self if pred(x))
def foldr(self, func, initial):
Returns the result of performing the fold-right operation on this
Stream with the given function and initial value.
Do not use this method on infinite Streams!
return functools.reduce(lambda x,y: func(y,x), reversed(self), initial)
def last(self, pred = None):
Returns the last item in this Stream that matches the given predicate
If no predicate is provided, returns the last item in this Stream
Do not use this method on infinite Streams!
return self.reversed().first(pred)
def list(self):
Returns the items of this Stream in a list
Equivalent to ``
Do not use this method on infinite Streams!
return list(self)
def map(self, func):
Returns a Stream containing the results of calling func() with each
item in this stream, similar to map()
Example Usage
>>> st = stream([1,2,3,4,5])
>>> x: x * x) # => Stream with 1, 4, 9, 16, 25
>>> # => Stream with 1.0, 2.0, 3.0, 4.0, 5.0
return Stream(map(func, self))
def reduce(self, func, initial):
Returns the result of the reduce operation using the given function and
initial value
return functools.reduce(func, self, initial)
def repeat(self, n = 2):
Returns a Stream that repeats the contents of this Stream `n` times
Do not use this method on infinite Streams!
return Stream(itertools.chain.from_iterable(self.tee(n)))
def reversed(self):
Returns a reversed version of this Stream.
Do not use this method on infinite Streams!
return Stream(reversed(self))
def set(self):
Returns the items of this Stream in a set
Equivalent to ``
Do not use this method on infinite Streams!
return set(self)
def sort(self, key = None):
Returns a sorted version of this Stream, using `key` as a key function
Do not use this method on infinite Streams!
return Stream(sorted(self, key=key))
def take(self, num):
"""Returns a Stream containing the first `num` items of this Stream"""
iterator = iter(self)
return Stream(next(iterator) for _ in range(num))
def tee(self, n = 2):
Returns a tuple containing `n` duplicates of this Stream
Do not use this on infinite Streams!
return tuple(Stream(t) for t in itertools.tee(self, n))
def to(self, type_):
Returns the Stream converted to `type_`
`type_` must be a type constructor that takes an iterable
Example Usage
>>> st = stream([1,2,3,4,5])
>>> # => [1, 2, 3, 4, 5]
>>> # => (1, 2, 3, 4, 5)
>>> # => {1, 2, 3, 4, 5}
>>> # => [1.0, 2.0, 3.0, 4.0, 5.0]
Do not use this on infinite Streams, unless the type constructor knows
how to deal with them.
return type_(self)
def tuple(self):
Returns the items of this Stream in a tuple
Equivalent to ``
Do not use this method on infinite Streams!
return tuple(self)
class ContainerStream(Stream):
Stream wrapping around a finite iterable container
Does no caching, unlike regular Stream; assumes that calling iter() on the
provided container will always return a fresh iterator with all items in
the container
def __init__(self, container):
"""Creates a Stream wrapping around the container"""
self.__container = container
def __iter__(self):
"""Returns the iterator for this Stream"""
return iter(self.__container)
def stream(iterable, use_container = None):
Returns a Stream for the iterable
Will use ContainerStream if `use_container` is True
If `use_container` is not provided, built-in container types will be
automatically upgraded to ContainerStream; to prevent this, pass
`use_container` as False
builtins = (dict, list, set, tuple)
return_container = use_container or \
use_container is None and type(iterable) in builtins
return ContainerStream(iterable) if return_container else Stream(iterable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment