Skip to content

Instantly share code, notes, and snippets.

@changlinli
Created June 24, 2018 02:23
Show Gist options
  • Save changlinli/0594a96d69556cb7844563de1992f46c to your computer and use it in GitHub Desktop.
Save changlinli/0594a96d69556cb7844563de1992f46c to your computer and use it in GitHub Desktop.
Example of ReaderT with fs2
object Example {
import fs2._
import fs2.async.mutable._
import cats._
import cats.data._
import cats.implicits._
/**
* This exists only because cats-effect keeps the trait expressing the
* Concurrent instance of Kleisli package private, otherwise we could just
* directly extend that trait instead of needing to reimplement each of
* the methods of our super classes
*/
def readerTEffectInstance[G[_]: Effect, A](
state: A
): Effect[({ type L[X] = Kleisli[G, A, X] })#L] =
new Effect[({ type L[X] = Kleisli[G, A, X] })#L] {
override def runAsync[A](fa: Kleisli[G, A, A])(
cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
Effect[G].runAsync(fa.run(state))(cb)
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Kleisli[G, A, A] =
Concurrent.catsKleisliConcurrent[G, A].async(k)
override def suspend[A](thunk: => Kleisli[G, A, A]): Kleisli[G, A, A] =
Concurrent.catsKleisliConcurrent[G, A].suspend(thunk)
override def flatMap[A, B](fa: Kleisli[G, A, A])(f: A => Kleisli[G, A, B]): Kleisli[G, A, B] =
Concurrent.catsKleisliConcurrent[G, A].flatMap(fa)(f)
override def tailRecM[A, B](a: A)(f: A => Kleisli[G, A, Either[A, B]]): Kleisli[G, A, B] =
Concurrent.catsKleisliConcurrent[G, A].tailRecM(a)(f)
override def raiseError[A](e: Throwable): Kleisli[G, A, A] =
Concurrent.catsKleisliConcurrent[G, A].raiseError(e)
override def handleErrorWith[A](fa: Kleisli[G, A, A])(
f: Throwable => Kleisli[G, A, A]): Kleisli[G, A, A] =
Concurrent.catsKleisliConcurrent[G, A].handleErrorWith(fa)(f)
override def pure[A](x: A): Kleisli[G, A, A] =
Concurrent.catsKleisliConcurrent[G, A].pure(x)
}
type RefIO[State, A] = ReaderT[IO, Ref[IO, State], A]
/**
* This is unsafe because it only works if you're sure nothing else is
* concurrently running against the same Ref. Otherwise between when we
* retrieve apply the state of our ref to our ReaderT computation, and
* then examine the state again to put it into our StateT, the state of
* the ref might have been mutated by something else.
*
* Note as well that unsafeToStateT is NOT the inverse of fromStateT. The
* latter is completely safe even in the face of concurrency. This creates
* a new Ref which will override the ref in the reader generated by fromStateT.
*/
def unsafeToStateT[State, A](value: RefIO[State, A]): StateT[IO, State, A] =
StateT[IO, State, A] { state =>
for {
ref <- Ref[IO, State](state)
result <- value.run(ref)
currentState <- ref.get
} yield (currentState, result)
}
def fromStateT[State: Monoid, A](stateT: StateT[IO, State, A]): RefIO[State, A] =
Kleisli[IO, Ref[IO, State], A] { ref =>
for {
currentState <- ref.get
stateAndValue <- stateT.run(currentState)
(newState, value) = stateAndValue
_ <- ref.setSync(newState)
} yield value
}
def stream(label: String): Stream[StateIO, Unit] =
Stream.eval(Effect[StateIO].pure(())).evalMap[Unit] { _ =>
for {
_ <- StateT.modify[IO, List[String]]("b" :: _)
_ <- StateT
.inspect[IO, List[String], String](_.toString)
.map(list => println(s"$label: $list"))
} yield ()
}
type StateIO[A] = StateT[IO, List[String], A]
type RefIOStr[A] = ReaderT[IO, Ref[IO, List[String]], A]
val stateIOToRefIOStr: StateIO ~> RefIOStr = new (StateIO ~> RefIOStr) {
override def apply[A](fa: StateIO[A]): RefIOStr[A] = fromStateT(fa)
}
val exampleIO = {
val initialStateAction = Ref[IO, List[String]](List.empty)
initialStateAction.flatMap { initialState =>
implicit val effectInstance = readerTEffectInstance[IO, Ref[IO, List[String]]](initialState)
val io = for {
currentState <- Kleisli.ask[IO, Ref[IO, List[String]]]
_ <- Kleisli.liftF(currentState.modify("a" :: _))
interrupt <- Signal[RefIOStr, Boolean](false)
_ <- stream("with interrupt")
.translate(stateIOToRefIOStr)
.interruptWhen(interrupt)
.compile
.drain
// Note that this stream will continue adding to the same state as the
// with interrupt state stream
_ <- stream("without interrupt")
.translate(stateIOToRefIOStr)
.compile
.drain
} yield ()
io.run(initialState)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment