Last active
February 28, 2017 12:02
-
-
Save d6y/aa8560a39ea19193f40c40a3561a6fd7 to your computer and use it in GitHub Desktop.
filtering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package foo | |
import scalaz.stream.{Exchange, Process, Sink} | |
import scalaz.stream.async.topic | |
import scalaz.concurrent.Task | |
import scalaz.stream.async.mutable.Topic | |
// Useful: | |
// - https://gist.github.com/djspiewak/d93a9c4983f63721c41c | |
// - https://www.youtube.com/watch?v=o3Siln85TJ4 (from 41 minutes sinks, channels, queues) | |
// - definition of stdOutLines is in: https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/io.scala | |
object TopicExplore { | |
case class Content(value: Int) | |
val messages = topic[Content]() | |
// A Process is "an effectful stream" of Content. | |
// Like a lazy list. | |
// The effect is a Task, and a Task wraps a Future[Throwable \/ A] | |
val subscriber: Process[Task, Content] = messages.subscribe | |
def isOdd(c: Content): Boolean = c.value % 2 != 0 | |
val console: Sink[Task, String] = scalaz.stream.io.stdOutLines | |
val oddSubscriber = subscriber.filter(isOdd).map(c => s"I saw: ${c.value}").to(console) | |
// A Sink is a Process[Task, Content => Task[Unit]] | |
// So... an effectful stream of functions. A source(!) of functions. | |
val publisher: Sink[Task, Content] = messages.publish | |
def isBig(c: Content): Boolean = c.value > 10 | |
// Although this works, it seems convoluted....I'm essentially attempting a filter(isBig) | |
val bigPublisher = publisher.map { f => { | |
(content: Content) => if (isBig(content)) f(content) else Task.now[Unit]( () ) | |
} } | |
def main(args: Array[String]): Unit = { | |
// Start the subscriber running: | |
oddSubscriber.run.unsafePerformAsync(println) | |
// (the println is because unsafePerformAsync expects a function when the asynch result becomes available. | |
// which isn't relelvant in our case) | |
// Publish some numbers: | |
val xs: Seq[Content] = (1 to 21).map(Content.apply) | |
Process.emitAll(xs).to(bigPublisher).run.unsafePerformSync | |
} | |
} | |
/* output | |
[info] Running foo.TopicExplore | |
I saw: 11 | |
I saw: 13 | |
I saw: 15 | |
I saw: 17 | |
I saw: 19 | |
I saw: 21 | |
[success] Total time: 1 s, completed 28-Feb-2017 11:56:41 | |
23. Waiting for source changes... (press enter to interrupt) | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment