Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pipelines.scala
import scala.collection.mutable
trait Source[+A] {
def output(): A
}
trait Flow[A, B] extends Sink[A] with Source[B]
trait Sink[-A] {
def input(a: A): Unit
}
object Syntax {
implicit class Pipe[A](source: Source[A]) {
def to(sink: Sink[A]): Unit = {
sink.input(source.output())
}
def to[B](flow: Flow[A, B]): Source[B] = {
flow.input(source.output())
flow
}
}
}
object Stdout extends Sink[String] {
override def input(a: String): Unit = println(a)
}
class MyDataSource extends Source[String] {
private val stream = Stream.from(0).map(_.toString).map(_ + "hello").iterator
override def output(): String = stream.take(1).toSeq.head
}
object MyDataSource {
def apply(): MyDataSource = new MyDataSource()
}
class StringToUpper extends Flow[String, String] {
private val buffer = mutable.Buffer[String]()
override def input(a: String): Unit = buffer += a
override def output(): String = buffer.remove(0).toUpperCase()
}
object StringToUpper {
def apply(): StringToUpper = new StringToUpper()
}
import Syntax._
val source = MyDataSource()
val transform = StringToUpper()
for (_ <- 0 to 10) {
source to transform to Stdout
}
// run(
// Source(0 to 1000)
// |> Transform(a => a + 1)
// |> Sink(println)
// )
//Stream.from(0)
// .map(number => number.toString)
// .map(string => string.toUpperCase())
// .take(10)
// .foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment