Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Last active May 3, 2020 08:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yasuabe/6fcbbdd77beec0a6f2b85a5b18d2dc59 to your computer and use it in GitHub Desktop.
Save yasuabe/6fcbbdd77beec0a6f2b85a5b18d2dc59 to your computer and use it in GitHub Desktop.
fs2.concurrent Balance and Broadcast demo. To run these, uncomment ConcurrentyDemoApp.
package qiita
import cats.effect._
import cats.syntax.functor._
import fs2.{Pipe, Stream, io, text}
import scala.concurrent.duration._
import scala.language.postfixOps
trait ConcurrencyDemoApp extends IOApp {
def run(args: List[String]): IO[ExitCode] =
Stream.resource(Blocker[IO])
.flatMap(implicit bl => stream[IO])
.compile.drain
.as(ExitCode.Success)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
}
object BalanceAndBroadcastDemo {
def readlineStream[F[_]: Sync : ContextShift](implicit bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.map(_.trim)
.filter(_.nonEmpty)
.takeWhile(_ != ":q", takeFailure = true)
def printlnSink[F[_]: Sync : ContextShift](implicit bl: Blocker): Pipe[F, String, Unit] =
_.map(s => s"$s\n")
.through(text.utf8Encode)
.through(io.stdout[F](bl))
def worker[F[_]: Sync: ContextShift: Timer](id: String, sec: Int)
(implicit bl: Blocker): Pipe[F, String, Int] = _ flatMap { s =>
val sleep = Stream.sleep_[F](sec.second)
val print = Stream(s"worker#$id processing '$s' (${sec}s)") through printlnSink
(sleep ++ print) zipRight Stream(s).map(_.length)
}
}
import BalanceAndBroadcastDemo._
object BalanceDemoApp extends ConcurrencyDemoApp {
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] =
readlineStream[F]
.balanceThrough(1)(worker("A", 1), worker("B", 2), worker("C", 3))
.map(n => s"length: $n")
.through(printlnSink)
}
object BroadcastDemoApp extends ConcurrencyDemoApp {
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] =
readlineStream[F]
.broadcastThrough(worker("A", 1), worker("B", 2), worker("C", 3))
.map(n => s"length: $n")
.through(printlnSink)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment