Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Last active September 22, 2019 17:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yasuabe/3bda54acbf91d54e8509f0946ce0e3e7 to your computer and use it in GitHub Desktop.
Save yasuabe/3bda54acbf91d54e8509f0946ce0e3e7 to your computer and use it in GitHub Desktop.
fs2 concurrency topic demo. To run this, uncomment ConcurrencyDemoApp and package object and move them to appropriate place.
package qiita
import fs2.concurrent.{SignallingRef, Topic}
import fs2.{INothing, io, text}
import scala.concurrent.duration._
import scala.language.postfixOps
sealed trait Event
case object Start extends Event
case class Text(value: String) extends Event
case object Quit extends Event
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect._
import cats.syntax.functor._
import fs2.Stream
import scala.concurrent.ExecutionContext
object concurrency {
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal {
def eval: Stream[F, O] = Stream.eval(fo)
}
}
import concurrency._
trait ConcurrencyDemoApp extends IOApp {
private val blockingEC = {
val acquire = IO(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)))
val release = (ec: ExecutorService) => IO(ec.shutdown())
Resource.make(acquire)(release)
}
def run(args: List[String]): IO[ExitCode] =
Stream.resource(blockingEC).flatMap { ec =>
implicit val bl: Blocker = Blocker.liftExecutionContext(ec)
stream[IO]
}.compile.drain.as(ExitCode.Success)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
}
object TopicDemoApp extends ConcurrencyDemoApp {
def stdinStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.map(_.trim)
.filter(_.nonEmpty)
.takeWhile(_ != ":q", takeFailure = true)
def printlnStream[F[_]: Sync : ContextShift](line: String)(implicit bl: Blocker): Stream[F, Unit] =
Stream(s"$line\n")
.through(text.utf8Encode)
.through(io.stdout[F](bl))
class EventService[F[_] : ContextShift : Timer : Concurrent]
(topic: Topic[F, Event], haltWhenTrue: SignallingRef[F, Boolean])(implicit ec: Blocker) {
def publisher: Stream[F, Unit] = stdinStream flatMap {
case ":q" => topic.publish1(Quit).eval
case s => topic.publish1(Text(s"$s")).eval
} interruptWhen haltWhenTrue
def subscriber(id: String, d: FiniteDuration): Stream[F, Unit] = topic.subscribe(maxQueued = 1) flatMap {
case Start => printlnStream(s"#$id started")
case Text(text) => Stream.sleep_[F](d) ++ printlnStream(s"#$id processing text event: $text")
case Quit => haltWhenTrue.set(true).eval
}
/**
* / [subscriberA] >>> [stdout]
* [stdin] >>> [publisher] >>> [topic] ー [subscriberB] >>> [stdout]
* \ [subscriberC] >>> [stdout]
*/
def start: Stream[F, INothing] = {
val subscribers: Stream[F, Unit] =
Stream(("A", 1.second), ("B", 2.second), ("C", 3.second))
.map { case (id, d) => subscriber(id, d) }
.parJoin(3)
Stream(publisher concurrently subscribers) parJoin 3 drain
}
}
override def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] =
for {
topic <- Topic[F, Event](Start).eval
signal <- SignallingRef(false).eval
_ <- new EventService[F](topic, signal).start
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment