Skip to content

Instantly share code, notes, and snippets.

@chuwy
Created May 8, 2019 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chuwy/39d1d236c3cae9dd31239d5502d5108a to your computer and use it in GitHub Desktop.
Save chuwy/39d1d236c3cae9dd31239d5502d5108a to your computer and use it in GitHub Desktop.
object WeirdChunks {
import cats.effect.concurrent.Ref
import fs2.concurrent.Queue
import scala.concurrent.duration._
implicit val t = IO.timer(scala.concurrent.ExecutionContext.global)
implicit val cs = IO.contextShift(scala.concurrent.ExecutionContext.global)
def init: IO[(Ref[IO, Int], Queue[IO, String])] =
(Ref.of[IO, Int](0), Queue.bounded[IO, String](10)).tupled
def generate(c: Ref[IO, Int]): IO[Either[String, Int]] =
(IO.sleep(200.millis) *> c.update(_ + 1) *> c.get).map { cc => if (cc % 2 != 0) Left(s"$cc is odd") else Right(cc) }
// I expect that no matter how elements are inserted, they should be grouped by 10/2sec
def pull(q: Queue[IO, String]): Stream[IO, Unit] =
q.dequeue.groupWithin(10, 2.seconds).evalMap { c => IO(println(c.toList)) } // It always prints List with single element
def start(i: Int): IO[Unit] = {
val stream = for {
r <- Stream.eval(init)
(c, q) = r
a <- Stream.repeatEval(generate(c)).take(i)
putting = a match { // Behaves differently if I flatMap it
case Right(n) => Stream.eval(IO(println(s"$n is fine")))
case Left(error) => Stream.eval(q.enqueue1(error))
}
pulling = pull(q)
_ <- putting.concurrently(pulling)
} yield ()
stream.compile.drain
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment