Skip to content

Instantly share code, notes, and snippets.

@JayBazuzi
Last active September 12, 2020 18:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JayBazuzi/7fcac6f7d2a637901b5873b464522fdd to your computer and use it in GitHub Desktop.
Save JayBazuzi/7fcac6f7d2a637901b5873b464522fdd to your computer and use it in GitHub Desktop.
Pipelines in Python proof-of-concept
# /usr/bin/env python3
from typing import Callable
import unittest
import typing
import abc
import typing
class Tests(unittest.TestCase):
def test_canary(self) -> None:
input_pipe = InputPipe[int]("age")
def triple(value: int) -> int:
return value * 3
collector = input_pipe \
.process_function(triple) \
.collect()
input_pipe.send(7)
self.assertEqual(21, collector.value)
TInput = typing.TypeVar("TInput")
TOutput = typing.TypeVar("TOutput")
class Listener(typing.Generic[TInput], abc.ABC):
def receive(self, value: TInput) -> None:
assert not "abstract"
class Collector(Listener[TInput]):
def __init__(self) -> None:
super().__init__()
def receive(self, value: TInput) -> None:
self.value = value
class Sender(typing.Generic[TOutput], abc.ABC):
def __init__(self) -> None:
self._listeners: typing.List[Listener[TOutput]] = []
def send(self, value: TOutput) -> None:
for listener in self._listeners:
listener.receive(value)
def _add_listener(self, listener: Listener[TOutput]) -> typing.Any:
self._listeners.append(listener)
return listener
_TOutput2 = typing.TypeVar("_TOutput2")
def process_function(self, function: typing.Callable[[TOutput], _TOutput2]) -> "FunctionPipe[TOutput, _TOutput2]":
result = FunctionPipe(function)
self._add_listener(result)
return result
def collect(self) -> Collector[TOutput]:
result = Collector[TOutput]()
self._add_listener(result)
return result
class InputPipe(Sender[TOutput]):
def __init__(self, name: str):
super().__init__()
self.name = name
class FunctionPipe(Listener[TInput], Sender[TOutput]):
def __init__(self, function: typing.Callable[[TInput], TOutput]):
super().__init__()
self._function = function
def receive(self, value: TInput) -> None:
self.send(self._function(value))
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment