Skip to content

Instantly share code, notes, and snippets.

@davisagli
Last active February 27, 2020 05:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davisagli/123f303550653618e20001677271c989 to your computer and use it in GitHub Desktop.
Save davisagli/123f303550653618e20001677271c989 to your computer and use it in GitHub Desktop.
pypes
import inspect
import itertools
class Pipe:
def __init__(self, source):
self.source = source
def __or__(self, sink):
if inspect.isgenerator(self.source) and not inspect.isgeneratorfunction(sink):
result = sink(list(self.source))
else:
result = sink(self.source)
return Pipe(result)
def counter():
# Produce increasing integers
x = 1
while True:
yield x
x += 1
def every_nth(n):
# Yield every nth item
def consume(s):
yield from itertools.islice(s, n - 1, None, n)
return consume
def echo(s):
# Print each item
for c in s:
print(c)
yield c
def average(s):
# Average of all items
return sum(s) / len(s)
def collect(s):
# Return all items
return s
# Print 0-9, then their average
Pipe(range(10)) | echo | average | print
# Print a list of every other item
Pipe([1, 2, 3, 4, 5, 6]) | every_nth(2) | print
# Print multiples of 3
Pipe(counter()) | every_nth(3) | echo | collect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment