Skip to content

Instantly share code, notes, and snippets.

@marko-asplund
Created August 1, 2019 12:11
Show Gist options
  • Save marko-asplund/6b1e9ce73dc55892200fb69e32d3df16 to your computer and use it in GitHub Desktop.
Save marko-asplund/6b1e9ce73dc55892200fb69e32d3df16 to your computer and use it in GitHub Desktop.
fs2 example code from underscore ported to fs2 v1.0.5 and cats-effect 1.4.0
import cats.implicits._
import cats.effect.{ConcurrentEffect, Effect, ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.concurrent.duration.MILLISECONDS
//
// fs2 example code from
// https://underscore.io/blog/posts/2018/03/20/fs2.html
// ported to fs2 v1.0.5 and cats-effect 1.4.0
//
object Fs2SampleCode extends IOApp {
def streamData[F[_] : Effect](implicit timer: Timer[F]) =
Stream.awakeEvery[F](1.second).evalMap(_ => timer.clock.realTime(MILLISECONDS).map(t => (t % 10000).toString))
def enqueueData[F[_] : Timer](q: Queue[F, String])(implicit F: Effect[F]) =
Stream.eval(
streamData.through(q.enqueue)
.compile
.drain
)
def dequeueData[F[_]](q: Queue[F, String]) = q.dequeue.take(4)
def withQueue[F[_] : ConcurrentEffect : Timer]: Stream[F, String] = {
val queue: Stream[F, Queue[F, String]] = Stream.eval(Queue.circularBuffer[F, String](5))
queue.flatMap { q =>
val enqueueStream = enqueueData(q)
val dequeueStream = dequeueData(q)
dequeueStream.concurrently(enqueueStream)
}
}
override def run(args: List[String]): IO[ExitCode] = {
val resultQueue = withQueue[IO].compile.toVector.unsafeRunSync()
println(s"Queue >> $resultQueue")
IO(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment