Skip to content

Instantly share code, notes, and snippets.

@ShahOdin
Created December 21, 2020 13:12
Show Gist options
  • Save ShahOdin/5addc8d37bd9760d213e2c94ced6c008 to your computer and use it in GitHub Desktop.
Save ShahOdin/5addc8d37bd9760d213e2c94ced6c008 to your computer and use it in GitHub Desktop.
demonstration of using Cats Effect's Ref for sharing state
import cats.{Applicative, Functor}
import cats.effect.concurrent.Ref
import cats.effect.{Concurrent, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import cats.syntax.functor._
import cats.syntax.applicative._
import scala.concurrent.duration.DurationInt
object DemoApp extends IOApp {
trait IncrementMeaningOfLife[F[_]] {
def increment: F[Unit]
}
object IncrementMeaningOfLife {
def apply[F[_]:Applicative](ref:Ref[F, Int]): Stream[F, IncrementMeaningOfLife[F]] = Stream.eval(
new IncrementMeaningOfLife[F] {
override def increment: F[Unit] = ref.update(_ + 1)
}.pure[F]
)
}
trait LogMeaningOfLife[F[_]]{
def log: F[Unit]
}
object LogMeaningOfLife {
def apply[F[_]: Functor](ref:Ref[F, Int]): LogMeaningOfLife[F] = new LogMeaningOfLife[F] {
override def log: F[Unit] = ref.get.map(println)
}
}
def stream[F[_]: Timer: Concurrent]: Stream[F, Unit] = for{
ref <- Stream.eval(Ref.of[F, Int](0))
incrementer <- IncrementMeaningOfLife(ref)
logger = LogMeaningOfLife(ref)
s1 = Stream.repeatEval(incrementer.increment).metered(1.seconds)
s2 = Stream.repeatEval(logger.log).metered(3.seconds)
_ <- Stream(s1, s2).parJoinUnbounded.interruptAfter(10.seconds)
} yield ()
override def run(args: List[String]): IO[ExitCode] = stream.compile.drain.as(ExitCode.Success)
}
@ShahOdin
Copy link
Author

prints:

2
5
8

Process finished with exit code 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment