Skip to content

Instantly share code, notes, and snippets.

@d6y
Last active February 28, 2017 12:02
Show Gist options
  • Save d6y/aa8560a39ea19193f40c40a3561a6fd7 to your computer and use it in GitHub Desktop.
Save d6y/aa8560a39ea19193f40c40a3561a6fd7 to your computer and use it in GitHub Desktop.
filtering
package foo
import scalaz.stream.{Exchange, Process, Sink}
import scalaz.stream.async.topic
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Topic
// Useful:
// - https://gist.github.com/djspiewak/d93a9c4983f63721c41c
// - https://www.youtube.com/watch?v=o3Siln85TJ4 (from 41 minutes sinks, channels, queues)
// - definition of stdOutLines is in: https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/io.scala
object TopicExplore {
case class Content(value: Int)
val messages = topic[Content]()
// A Process is "an effectful stream" of Content.
// Like a lazy list.
// The effect is a Task, and a Task wraps a Future[Throwable \/ A]
val subscriber: Process[Task, Content] = messages.subscribe
def isOdd(c: Content): Boolean = c.value % 2 != 0
val console: Sink[Task, String] = scalaz.stream.io.stdOutLines
val oddSubscriber = subscriber.filter(isOdd).map(c => s"I saw: ${c.value}").to(console)
// A Sink is a Process[Task, Content => Task[Unit]]
// So... an effectful stream of functions. A source(!) of functions.
val publisher: Sink[Task, Content] = messages.publish
def isBig(c: Content): Boolean = c.value > 10
// Although this works, it seems convoluted....I'm essentially attempting a filter(isBig)
val bigPublisher = publisher.map { f => {
(content: Content) => if (isBig(content)) f(content) else Task.now[Unit]( () )
} }
def main(args: Array[String]): Unit = {
// Start the subscriber running:
oddSubscriber.run.unsafePerformAsync(println)
// (the println is because unsafePerformAsync expects a function when the asynch result becomes available.
// which isn't relelvant in our case)
// Publish some numbers:
val xs: Seq[Content] = (1 to 21).map(Content.apply)
Process.emitAll(xs).to(bigPublisher).run.unsafePerformSync
}
}
/* output
[info] Running foo.TopicExplore
I saw: 11
I saw: 13
I saw: 15
I saw: 17
I saw: 19
I saw: 21
[success] Total time: 1 s, completed 28-Feb-2017 11:56:41
23. Waiting for source changes... (press enter to interrupt)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment