Skip to content

Instantly share code, notes, and snippets.

@dimitarg
Created March 31, 2022 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dimitarg/aecd27d5c51335afde9106e0b26c33f8 to your computer and use it in GitHub Desktop.
Save dimitarg/aecd27d5c51335afde9106e0b26c33f8 to your computer and use it in GitHub Desktop.
Translated stream behaving badly - what did I mess up?
package repro
import scala.concurrent.duration._
import cats.implicits._
import cats.~>
import cats.arrow.FunctionK
import cats.data.Kleisli
import cats.effect.IO
import fs2.Stream
object Repro {
type App[A] = Kleisli[IO, Unit, A] // The trivial ReaderT environment
def runApp[A](prg: App[A]): IO[A] = prg.run(())
val runAppK: App ~> IO = FunctionK.lift(runApp)
val allK: Stream[IO, Unit] =
Stream(1).repeat
.covary[IO]
.metered(100.millis)
.groupWithin(5, 10.seconds)
.flatMap(chunks => {
val processed = ().pure[App].as(chunks)
val s = Stream.evalUnChunk(processed)
val xs =
s.translate(runAppK)
xs
})
.evalTap(x => IO.println(x))
.void
.onFinalize {
IO.println("Should-be-infinite stream completed!!!")
}
val allIO: Stream[IO, Unit] =
Stream(1).repeat
.covary[IO]
.metered(100.millis)
.groupWithin(5, 10.seconds)
.flatMap(chunks => {
val processed = ().pure[IO].as(chunks)
val s = Stream.evalUnChunk(processed)
s
})
.evalTap(x => IO.println(x))
.void
.onFinalize {
IO.println("Should-be-infinite stream completed!!!")
}
def main(args: Array[String]): Unit = {
import cats.effect.unsafe.implicits.global
// this unexpectedly completes after the first chunk
allK.compile.drain.unsafeRunSync()
// this runs forever as expected
allIO.compile.drain.unsafeRunSync()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment