Skip to content

Instantly share code, notes, and snippets.

@d6y
Created February 28, 2017 12:31
Show Gist options
  • Save d6y/50db301a5359d35f1951767404a94c48 to your computer and use it in GitHub Desktop.
Save d6y/50db301a5359d35f1951767404a94c48 to your computer and use it in GitHub Desktop.
Two channels
import scalaz.stream.{Exchange, Process, Sink}
import scalaz.stream.async.topic
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Topic
object TopicExplore {
case class Message(value: String)
val messages = topic[Message]()
val admin = topic[Message]()
val console: Sink[Task, String] = scalaz.stream.io.stdOutLines
val adminWatcher = admin.subscribe.map(m => s"Admin was told: $m").to(console)
def isAdmin(m: Message): Boolean = m match {
case Message("admin") => true
case _ => false
}
val subscriber: Process[Task, Message] = messages.subscribe
val adminSubscriber = subscriber.filter(isAdmin).to(admin.publish)
val regularSubscriber = subscriber.filter(m => !isAdmin(m)).map(m => s"Non admin saw: $m").to(console)
val publisher: Sink[Task, Message] = messages.publish
def main(args: Array[String]): Unit = {
// Start the subscriber running: (println because unsafePerformAsync wants a callback)
adminSubscriber.run.unsafePerformAsync(println)
regularSubscriber.run.unsafePerformAsync(println)
adminWatcher.run.unsafePerformAsync(println)
// Publish some numbers:
val xs: Seq[Message] =
Message("hello") ::
Message("how are you?") ::
Message("admin") ::
Message("well, I'll be off") ::
Nil
Process.emitAll(xs).to(publisher).run.unsafePerformSync
}
}
@d6y
Copy link
Author

d6y commented Feb 28, 2017

Output:

Non admin saw: Message(hello)
Non admin saw: Message(how are you?)
Non admin saw: Message(well, I'll be off)
Admin was told: Message(admin)
[success] Total time: 0 s, completed 28-Feb-2017 12:31:04
39. Waiting for source changes... (press enter to interrupt)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment