Created
December 18, 2019 15:45
-
-
Save rz7d/c748caca87f5626aeb1277feb80b168b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pypeliner for Python 3 | |
# | |
# Usage | |
# | |
# [PROLOGUE] | |
# _ | <Value> | |
# <Value> | _ | |
# | |
# [INTERMEDIATE OPERATIONS] | |
# (pipeline) \ | |
# | "Mapping Function" # applying mappings | |
# | |
# [TERMINAL OPERATIONS] | |
# (pipeline) \ | |
# > "Terminal Operation" | |
# | |
import pathlib | |
from typing import Any, Callable, Generic, TypeVar | |
T = TypeVar("T") | |
R = TypeVar("R") | |
S = TypeVar("S") | |
class StreamPipeline(Generic[T]): | |
def __init__(self, x: T) -> None: | |
self.x = x | |
def __or__(self, mapper) -> "StreamPipeline": | |
""" | |
Executes "map" opeation. | |
""" | |
return self.__class__(mapper(self.x)) | |
def __gt__(self, consumer) -> "StreamPipeline": | |
""" | |
Executes "peek" operation. | |
""" | |
consumer(self.x) | |
return self | |
def __le__(self, right) -> T: | |
""" | |
Unwrap the value | |
""" | |
return self.x | |
class Prologue: | |
def __or__(self, right) -> StreamPipeline: | |
return StreamPipeline(right) | |
def __call__(self, *args): | |
return self | |
def __le__(self, right): | |
return right.x | |
_ = Prologue() | |
if __name__ == "__main__": | |
(_ | |
| "hello" | |
| str.upper | |
| (lambda x: x.replace("ELL", "ell")) | |
hello = _ <= (_ | |
| "hello" | |
| str.upper | |
| (lambda x: x.replace("ELL", "ell")) | |
) | |
print(hello) | |
(_ | |
| range(0, 100) | |
| (lambda c: map(lambda x: x + 1, c)) | |
| (lambda c: map(lambda x: x * 100, c)) | |
| list | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment