Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Last active September 23, 2019 12:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yasuabe/d5f5b3809eeef71950478fac0e51c406 to your computer and use it in GitHub Desktop.
Save yasuabe/d5f5b3809eeef71950478fac0e51c406 to your computer and use it in GitHub Desktop.
fs2 queue sample in which stdin and timer streams enqueue data and dequeuer side prints to stdout
package qiita
import cats.effect._
import cats.syntax.functor._
import fs2.concurrent.Queue
import fs2.{Pipe, Stream, io, text}
import scala.concurrent.duration._
import scala.language.postfixOps
object concurrency {
implicit class StreamOps[F[_], O](val fo: F[O]) extends AnyVal {
def eval: Stream[F, O] = Stream.eval(fo)
}
}
import concurrency._
trait ConcurrencyDemoApp extends IOApp {
def run(args: List[String]): IO[ExitCode] =
Stream.resource(Blocker[IO]).flatMap(implicit bl => stream[IO]).compile.drain.as(ExitCode.Success)
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit]
}
/*
<stdin> >>> [string manipulation] >>>>>> [enqueue]>>─┐
                        ├>> [dequeue] >> [stdout] >>─┐
<timer> >>> [retrieve current second] >> [enqueue]>>─┘ ├>> end
[sleep 15 sec] >>>>─┘
*/
object QueueTestApp extends ConcurrencyDemoApp {
def stdinStream[F[_] : Sync : ContextShift](bl: Blocker): Stream[F, String] =
io.stdin[F](4096, bl)
.through(text.utf8Decode)
.through(text.lines)
.map(_.trim)
.filter(_.nonEmpty)
def timerStream[F[_] : Timer](implicit F: Concurrent[F]): Stream[F, Long] = {
val timer: Stream[F, Unit] = Stream.fixedDelay[F](2.second)
val seconds: Stream[F, Long] = F.delay(System.currentTimeMillis).eval.repeat
(timer zipRight seconds) map (_ / 1000 % 60)
}
def join[F[_]: ContextShift : Timer](q: Queue[F, String])
(implicit F: Concurrent[F], bl: Blocker): Stream[F, Unit] = {
val timerSink: Pipe[F, Long, Unit] = _.evalMap(n => q.enqueue1(s"$n from timer\n"))
val stdinSink: Pipe[F, String, Unit] = _.evalMap(s => q.enqueue1(s"$s from stdin\n"))
val stdoutSink: Pipe[F, String, Unit] = s => text.utf8Encode(s) through io.stdout[F](bl)
val enq1: Stream[F, Unit] = timerStream[F] through timerSink
val enq2: Stream[F, Unit] = stdinStream[F](bl) through stdinSink
val deq: Stream[F, Unit] = q.dequeue through stdoutSink
Stream(enq1, enq2, deq) parJoin 3
}
def stream[F[_] : Concurrent : Timer : ContextShift](implicit bl: Blocker): Stream[F, Unit] = for {
q <- Queue.bounded[F, String](100).eval
_ <- Stream.sleep_[F](15.seconds) concurrently join(q).drain
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment