Skip to content

Instantly share code, notes, and snippets.

@frohoff
Last active August 29, 2015 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frohoff/ef5f032b72eadfa61da9 to your computer and use it in GitHub Desktop.
Save frohoff/ef5f032b72eadfa61da9 to your computer and use it in GitHub Desktop.
monadish reactive flow combinators
package org.frohoff.flow
import scala.collection.mutable.Buffer
import Flow._
object Test extends App {
val f: Flow[Int,Int] = Flow[Int]
val f2: Flow[Int,String] = f.map(_.toHexString)
val f3: Flow[Int,Option[Int]] = f.map(Option(_))
//f3.flatten // doesn't compile yet
val f4: Flow[Int,Int] = f.flatMap(Option(_))
val f5: Int => Unit = f3.sink(println(_))
val f6: Flow[Int,Seq[String]] = f2.map(Seq(_))
val f7: Flow[Int,String] = f6.flatten
val f8: Flow[Int, String]
= for { x: Int <- f
y: Int = x+1
if x == 2 } yield y.toString()
f8(2, println(_))
val f9: Int => Unit = f8.sink(println(_))
val f10: Int => Seq[String] = f8.lift
val x1: Seq[String] = f10(2)
println(x1)
println(f10(0))
f9(2)
}
trait Flow[A, B] extends ((A, Sink[B]) => Unit) {
def map[C](f: B => C): Flow[A, C] = {
val self = this
new Flow[A, C] {
def apply(in: A, sink: Sink[C]) = self(in, f andThen sink)
}
}
def map[C](flow: Flow[B, C]): Flow[A, C] = {
val self = this
new Flow[A, C] {
def apply(in: A, sink: Sink[C]) = self(in, (b => flow(b, sink)))
}
}
def flatMap[C, T <% Traversable[C]](f: B => T): Flow[A, C] = {
val self = this
new Flow[A, C] {
def apply(in: A, sink: Sink[C]) = self(in, (b => f(b) foreach (sink)))
}
}
def flatMap[C, T <% Traversable[C]](flow: Flow[B, T]): Flow[A, C] = {
val self = this
new Flow[A, C] {
def apply(in: A, sink: Sink[C]) = self(in, (b => flow(b, _ foreach (sink))))
}
}
// TODO: support Option
def flatten[C](implicit ev: B <:< Traversable[C]): Flow[A, C] = flatMap (t => t)
def filter(f: B => Boolean): Flow[A, B] = {
val self = this
new Flow[A, B] {
def apply(in: A, sink: Sink[B]) = self(in, b => if(f(b)) sink(b))
}
}
def sink(f: Sink[B]): Sink[A] = this(_, f)
def lift: A => Seq[B] = {
val self = this
(a: A) => {
val accum = Buffer[B]()
self(a, accum.append(_))
accum
}
}
}
object Flow {
type Sink[A] = A => Unit
def apply[A] = ident[A]
def ident[A] = new Flow[A,A] {
def apply(in: A, sink: Sink[A]) = sink(in)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment