Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Last active Sep 22, 2019
Embed
What would you like to do?
fs2 concurrency topic demo. To run this, uncomment ConcurrencyDemoApp and package object and move them to appropriate place.
package qiita
import fs2.concurrent.{SignallingRef, Topic}
import fs2.{INothing, io, text}
import scala.concurrent.duration._
import scala.language.postfixOps
sealed trait Event
case object Start extends Event
case class Text(value: String) extends Event
case object Quit extends Event
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect._
import cats.syntax.functor._
import fs2.Stream
import scala.concurrent.ExecutionContext
object concurrency {
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal {
def eval: Stream[F, O] = Stream.eval(fo)
}
}
import concurrency._
trait ConcurrencyDemoApp extends IOApp {
private val blockingEC = {
val acquire = IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)))
val release = (ec: ExecutorService) => IO(ec.shutdown())
Resource.make(acquire)(release)
}
def run(args: List[String]): IO[ExitCode] =
Stream.resource(blockingEC).flatMap { ec =>
implicit val bl: Blocker = Blocker.liftExecutionContext(ec)
stream[IO]
}.compile.drain.as(ExitCode.Success)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
}
object TopicDemoApp extends ConcurrencyDemoApp {
def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.map(_.trim)
.filter(_.nonEmpty)
.takeWhile(_ != ":q", takeFailure = true)
def printlnStream[F[_]: Sync : ContextShift](line: String)(implicit bl: Blocker): Stream[F, Unit] =
Stream(s"$line\n")
.through(text.utf8Encode)
.through(io.stdout[F](bl))
class EventService[F[_] : ContextShift : Timer : Concurrent]
(topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])(implicit ec: Blocker) {
def publisher: Stream[F, Unit] = stdinStream flatMap {
case ":q" => topic.publish1(Quit).eval
case s => topic.publish1(Text(s"$s")).eval
} interruptWhen haltWhenTrue
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = topic.subscribe(maxQueued = 1) flatMap {
case Start => printlnStream(s"#$id started")
case Text(text) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $text")
case Quit => haltWhenTrue.set(true).eval
}
/**
* / [subscriberA] >>> [stdout]
* [stdin] >>> [publisher] >>> [topic] ー [subscriberB] >>> [stdout]
* \ [subscriberC] >>> [stdout]
*/
def start: Stream[F, INothing] = {
val subscribers: Stream[F, Unit] =
Stream(("A", 1.second), ("B", 2.second), ("C", 3.second))
.map { case (id, d) => subscriber(id, d) }
.parJoin(3)
Stream(publisher concurrently subscribers) parJoin 3 drain
}
}
override def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] =
for {
topic <- Topic[F, Event](Start).eval
signal <- SignallingRef(false).eval
_ <- new EventService[F](topic, signal).start
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment