fs2 concurrency topic demo. To run this, uncomment ConcurrencyDemoApp and package object and move them to appropriate place.
package qiita
import fs2.concurrent.{SignallingRef, Topic}
import fs2.{INothing, io, text}
import scala.concurrent.duration._
import scala.language.postfixOps
sealed trait Event
case object Start extends Event
case class Text(value: String) extends Event
case object Quit extends Event
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect._
import cats.syntax.functor._
import fs2.Stream
import scala.concurrent.ExecutionContext
object concurrency {
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal {
def eval: Stream[F, O] = Stream.eval(fo)
import concurrency._
trait ConcurrencyDemoApp extends IOApp {
private val blockingEC = {
val acquire = IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)))
val release = (ec: ExecutorService) => IO(ec.shutdown())
def run(args: List[String]): IO[ExitCode] =
Stream.resource(blockingEC).flatMap { ec =>
implicit val bl: Blocker = Blocker.liftExecutionContext(ec)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
object TopicDemoApp extends ConcurrencyDemoApp {
def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.takeWhile(_ != ":q", takeFailure = true)
def printlnStream[F[_]: Sync : ContextShift](line: String)(implicit bl: Blocker): Stream[F, Unit] =
class EventService[F[_] : ContextShift : Timer : Concurrent]
(topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])(implicit ec: Blocker) {
def publisher: Stream[F, Unit] = stdinStream flatMap {
case ":q" => topic.publish1(Quit).eval
case s => topic.publish1(Text(s"$s")).eval
} interruptWhen haltWhenTrue
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = topic.subscribe(maxQueued = 1) flatMap {
case Start => printlnStream(s"#$id started")
case Text(text) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $text")
case Quit => haltWhenTrue.set(true).eval
* / [subscriberA] >>> [stdout]
* [stdin] >>> [publisher] >>> [topic] ー [subscriberB] >>> [stdout]
* \ [subscriberC] >>> [stdout]
def start: Stream[F, INothing] = {
val subscribers: Stream[F, Unit] =
Stream(("A", 1.second), ("B", 2.second), ("C", 3.second))
.map { case (id, d) => subscriber(id, d) }
Stream(publisher concurrently subscribers) parJoin 3 drain
override def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] =
for {
topic <- Topic[F, Event](Start).eval
signal <- SignallingRef(false).eval
_ <- new EventService[F](topic, signal).start
} yield ()
