Skip to content

Instantly share code, notes, and snippets.

@dnaumenko
Last active March 28, 2019 14:35
Show Gist options
  • Save dnaumenko/1a4687ef082ab060c1a40826c83f261d to your computer and use it in GitHub Desktop.
Save dnaumenko/1a4687ef082ab060c1a40826c83f261d to your computer and use it in GitHub Desktop.
import cats.data.ReaderT
import cats.effect.{Concurrent, IO, Sync}
import cats.mtl.ApplicativeLocal
import fs2.Stream
object UpdateLocalContextProblem extends App {
type Ctx = String
type Header = String
type ReaderIO[A] = ReaderT[IO, Ctx, A]
case class Message(payload: String, header: Header)
def toCtx(header: Header): Ctx = s"$header-updated"
case class Consumer[F[_] : Concurrent](messages: Seq[Message])(implicit val local: ApplicativeLocal[F, Ctx]) {
def consume(): Stream[F, Ctx] = {
Stream.emits(messages).flatMap[F, Message] {
msg =>
val ctx = toCtx(msg.header)
val fun: F[Message] => F[Message] = local.scope[Message](ctx)
Stream.eval(fun(Sync[F].pure(msg)))
}.evalMap {
_ => local.ask // returns ctx, not ctx-updated
}
}
}
implicit val ioCs = IO.contextShift(scala.concurrent.ExecutionContext.global)
import cats.mtl.implicits._
val result = Consumer[ReaderIO](Seq(Message("value1", "header1"))).consume().compile.toList.run("ctx").unsafeRunSync()
println(result) // prints List(ctx), but I want to print it ctx-updated
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment