Last active
August 29, 2015 14:20
-
-
Save frohoff/ef5f032b72eadfa61da9 to your computer and use it in GitHub Desktop.
monadish reactive flow combinators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.frohoff.flow | |
import scala.collection.mutable.Buffer | |
import Flow._ | |
object Test extends App { | |
val f: Flow[Int,Int] = Flow[Int] | |
val f2: Flow[Int,String] = f.map(_.toHexString) | |
val f3: Flow[Int,Option[Int]] = f.map(Option(_)) | |
//f3.flatten // doesn't compile yet | |
val f4: Flow[Int,Int] = f.flatMap(Option(_)) | |
val f5: Int => Unit = f3.sink(println(_)) | |
val f6: Flow[Int,Seq[String]] = f2.map(Seq(_)) | |
val f7: Flow[Int,String] = f6.flatten | |
val f8: Flow[Int, String] | |
= for { x: Int <- f | |
y: Int = x+1 | |
if x == 2 } yield y.toString() | |
f8(2, println(_)) | |
val f9: Int => Unit = f8.sink(println(_)) | |
val f10: Int => Seq[String] = f8.lift | |
val x1: Seq[String] = f10(2) | |
println(x1) | |
println(f10(0)) | |
f9(2) | |
} | |
trait Flow[A, B] extends ((A, Sink[B]) => Unit) { | |
def map[C](f: B => C): Flow[A, C] = { | |
val self = this | |
new Flow[A, C] { | |
def apply(in: A, sink: Sink[C]) = self(in, f andThen sink) | |
} | |
} | |
def map[C](flow: Flow[B, C]): Flow[A, C] = { | |
val self = this | |
new Flow[A, C] { | |
def apply(in: A, sink: Sink[C]) = self(in, (b => flow(b, sink))) | |
} | |
} | |
def flatMap[C, T <% Traversable[C]](f: B => T): Flow[A, C] = { | |
val self = this | |
new Flow[A, C] { | |
def apply(in: A, sink: Sink[C]) = self(in, (b => f(b) foreach (sink))) | |
} | |
} | |
def flatMap[C, T <% Traversable[C]](flow: Flow[B, T]): Flow[A, C] = { | |
val self = this | |
new Flow[A, C] { | |
def apply(in: A, sink: Sink[C]) = self(in, (b => flow(b, _ foreach (sink)))) | |
} | |
} | |
// TODO: support Option | |
def flatten[C](implicit ev: B <:< Traversable[C]): Flow[A, C] = flatMap (t => t) | |
def filter(f: B => Boolean): Flow[A, B] = { | |
val self = this | |
new Flow[A, B] { | |
def apply(in: A, sink: Sink[B]) = self(in, b => if(f(b)) sink(b)) | |
} | |
} | |
def sink(f: Sink[B]): Sink[A] = this(_, f) | |
def lift: A => Seq[B] = { | |
val self = this | |
(a: A) => { | |
val accum = Buffer[B]() | |
self(a, accum.append(_)) | |
accum | |
} | |
} | |
} | |
object Flow { | |
type Sink[A] = A => Unit | |
def apply[A] = ident[A] | |
def ident[A] = new Flow[A,A] { | |
def apply(in: A, sink: Sink[A]) = sink(in) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment