Skip to content

Instantly share code, notes, and snippets.

@a-recknagel
Last active February 8, 2023 12:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save a-recknagel/8fd4858a8139ac4835b7f2a08b0996c1 to your computer and use it in GitHub Desktop.
Save a-recknagel/8fd4858a8139ac4835b7f2a08b0996c1 to your computer and use it in GitHub Desktop.
lazy pipeline object
from typing import Iterable
import itertools
class Stream:
def __init__(self, stream: Iterable):
self.stream = stream
def __iter__(self):
yield from self.stream
def add_a(self)->"Stream":
def inner():
for x in self.stream:
print(" adding a")
yield x+"a"
return Stream(inner())
def add_b(self)->"Stream":
def inner():
for x in self.stream:
print(" adding b")
yield x+"b"
return Stream(inner())
def duplicate(self)->"Stream":
def inner():
print(" first")
second = []
for x in self.stream:
yield x
second.append(x)
print(" second")
for x in second:
yield x
return Stream(inner())
def elem_type(self):
try:
tmp = next(self.stream)
except TypeError:
tmp = next(iter(self.stream))
else:
self.stream = itertools.chain([tmp], self.stream)
return type(tmp)
# lazily exhausts the complete stream without aggregating between calls -- pipeline bugs are discovered quickly
>>> for element in Stream("12345").add_a().add_b().duplicate():
... print(element)
first
adding a
adding b
1ab
adding a
adding b
2ab
adding a
adding b
3ab
adding a
adding b
4ab
adding a
adding b
5ab
second
1ab
2ab
3ab
4ab
5ab
# operation that requires only one fully processed element stops without processing all data
>>> Stream("12345").add_a().add_b().duplicate().elem_type()
first
adding a
adding b
<class 'str'>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment