Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active March 15, 2022 20:27
Show Gist options
  • Star 37 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save gvolpe/d4facceedf12ab7870934cbf9d6174e9 to your computer and use it in GitHub Desktop.
Save gvolpe/d4facceedf12ab7870934cbf9d6174e9 to your computer and use it in GitHub Desktop.
Shared State in pure Functional Programming

Shared State in pure Functional Programming

Newcomers to Functional Programming are often very confused about the proper way to share state without breaking purity and end up having a mix of pure and impure code that defeats the purpose of having pure FP code in the first place.

Reason why I decided to write up a beginner friendly guide :)

Use Case

We have a program that runs three computations at the same time and updates the internal state to keep track of the tasks that have been completed. When all the tasks are completed we request the final state and print it out.

You should get an output similar to the following one:

Starting process #1
Starting process #2
Starting process #3
  ... 3 seconds
Done #2
  ... 5 seconds
Done #1
  ... 10 seconds
Done #3
List(#2, #1, #3)

We'll use the concurrency primitive Ref[IO, List[String]] to represent our internal state because it's a great fit.

Getting started

So this is how we might decide to start writing our code:

import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import cats.syntax.all._

import scala.concurrent.duration._

object sharedstate extends IOApp {

  var myState: Ref[IO, List[String]] = _

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  val process1: IO[Unit] = {
    putStrLn("Starting process #1") *>
      IO.sleep(5.seconds) *>
      myState.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
  }

  val process2: IO[Unit] = {
    putStrLn("Starting process #2") *>
      IO.sleep(3.seconds) *>
      myState.update(_ ++ List("#2")) *>
      putStrLn("Done #2")
  }

  val process3: IO[Unit] = {
    putStrLn("Starting process #3") *>
      IO.sleep(10.seconds) *>
      myState.update(_ ++ List("#3")) *>
      putStrLn("Done #3")
  }

  def masterProcess: IO[Unit] = {
    myState = Ref.of[IO, List[String]](List.empty[String]).unsafeRunSync()
    val ioa = List(process1, process2, process3).parSequence.void
    ioa *> myState.get.flatMap(rs => putStrLn(rs.toString))
  }

  override def run(args: List[String]): IO[ExitCode] =
    masterProcess.as(ExitCode.Success)

}

We defined a var myState: Ref[IO, List[String]] initialized as null so we can create it on startup and all the child processes can have access to it. A so called global state.

But now we try to run our application and we encounter our first ugly problem: NullPointerException on line 19. All the processes are defined by using myState which has not yet been initialized. So an easy way to fix it is to define all our processes as lazy val.

lazy val process1: IO[Unit] = ???
lazy val process2: IO[Unit] = ???
lazy val process3: IO[Unit] = ???

That worked, brilliant! We have an application that meets the business criteria and most importantly it works!

Rethinking our application

But let's take a step back and review our code once again, there are at least two pieces of code that should have caught your attention:

var myState: Ref[IO, List[String]] = _

We are using var and initializing our state to null, OMG! Also the workaround of lazy val should get you thinking...

And here's the second obvious one:

myState = Ref.of[IO, List[String]](List.empty[String]).unsafeRunSync()

We require our myState to be of type Ref[IO, List[String] but the smart constructor gives us an IO[Ref[IO, List[String]]] so we are "forced" to call unsafeRunSync() to get our desired type. And there's a reason for that, the creation of a Ref[F, A] is side effectful, therefore it needs to be wrapped in IO to keep the purity.

But wait a minute... that unsafeRunSync() is something that you should only see at the edge of your program, most commonly in the main method that is invoked by the JVM and that is impure by nature (of type Unit). But because we are using IOApp we shouldn't be calling any operations which names are prefixed with unsafe.

You say to yourself, yes, I know this is bad and ugly but I don't know a better way to share the state between different computations and this works. But we know you have heard that funcional programming is beautiful so why doing this?

Functional Programming

Okay, can we do better? Of course we do and you wouldn't believe how simple it is!

Let's get started by getting rid of that ugly var myState initialized to null and pass it as parameter to the processes that need to access it:

import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import cats.syntax.all._

import scala.concurrent.duration._

object sharedstate extends IOApp {

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  def process1(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #1") *>
      IO.sleep(5.seconds) *>
      myState.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
  }

  def process2(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #2") *>
      IO.sleep(3.seconds) *>
      myState.update(_ ++ List("#2")) *>
      putStrLn("Done #2")
  }

  def process3(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #3") *>
      IO.sleep(10.seconds) *>
      myState.update(_ ++ List("#3")) *>
      putStrLn("Done #3")
  }

  def masterProcess: IO[Unit] = {
    val myState: Ref[IO, List[String]] = Ref.of[IO, List[String]](List.empty[String]).unsafeRunSync()

    val ioa = List(process1(myState), process2(myState), process3(myState)).parSequence.void
    ioa *> myState.get.flatMap(rs => putStrLn(rs.toString))
  }

  override def run(args: List[String]): IO[ExitCode] =
    masterProcess.as(ExitCode.Success)

}

Great! We got rid of that global state and we are now passing our Refas a parameter. Remember that it is a concurrency primitive meant to be accesed and modified in concurrent scenarios, so we are safe here.

Notice how all our processes are now defined as def processN(myState: Ref[IO, List[String]]).

A well known method: flatMap!

Now, we still have that unsafeRunSync() hanging around our code, how can we get rid of it? The answer is flatMap!!!

def masterProcess: IO[Unit] =
  Ref.of[IO, List[String]](List.empty[String]).flatMap { myState =>
    val ioa = List(process1(myState), process2(myState), process3(myState)).parSequence.void
    ioa *> myState.get.flatMap(rs => putStrLn(rs.toString))
  }

You only need to call flatMap once up in the call chain where you call the processes to make sure they all share the same state. If you don't do this, a new Ref will be created every time you flatMap (remember creating a Ref is side effectful!) and thus your processes will not be sharing the same state changing the behavior of your program.

We now have a purely functional code that shares state in a simple and pure fashion. Here's the entire FP program:

import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import cats.syntax.all._

import scala.concurrent.duration._

object sharedstate extends IOApp {

  def putStrLn(str: String): IO[Unit] = IO(println(str))

  def process1(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #1") *>
      IO.sleep(5.seconds) *>
      myState.update(_ ++ List("#1")) *>
      putStrLn("Done #1")
  }

  def process2(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #2") *>
      IO.sleep(3.seconds) *>
      myState.update(_ ++ List("#2")) *>
      putStrLn("Done #2")
  }

  def process3(myState: Ref[IO, List[String]]): IO[Unit] = {
    putStrLn("Starting process #3") *>
      IO.sleep(10.seconds) *>
      myState.update(_ ++ List("#3")) *>
      putStrLn("Done #3")
  }

  def masterProcess: IO[Unit] =
    Ref.of[IO, List[String]](List.empty[String]).flatMap { myState =>
      val ioa = List(process1(myState), process2(myState), process3(myState)).parSequence.void
      ioa *> myState.get.flatMap(rs => putStrLn(rs.toString))
    }

  override def run(args: List[String]): IO[ExitCode] =
    masterProcess.as(ExitCode.Success)

}

Applying the technique in other libraries

Although in the example above we only see how it's done with the Cats Effect library, this principle expands to other FP libraries as well.

For example, when writing an http4s application you might need to create an HttpClient that needs to be used by more than one of your services. So again, create it at startup and flatMap it once:

object HttpServer extends StreamApp[IO] {

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      httpClient <- Http1Client.stream[IO]()
      endpoint1  = new HttpEndpointOne[IO](httpClient)
      endpoint2  = new HttpEndpointTwo[IO](httpClient)
      exitCode   <- BlazeBuilder[F]
                      .bindHttp(8080, "0.0.0.0")
                      .mountService(endpoint1)
                      .mountService(endpoint2)
                      .serve
    } yield exitCode

}

class HttpEndpointOne[F[_]](client: Client[F]) { ... }
class HttpEndpointTwo[F[_]](client: Client[F]) { ... }

When writing fs2 applications you can apply the same technique, for example between processes that share a Queue, Topic, Signal, Semaphore, etc.

Remember that if you are forced to call unsafeRunSync() other than in your main method it might be a code smell.

Other explanations

The question about shared state has been asked thousand times in the Gitter channels of Cats, Fs2, Http4s, etc. Here are a few magnificent examples and explanations by @SystemFW:

Conclusion

This simple and powerful technique can be applied absolutely everywhere!

In simple terms, remind yourself about this: "FlatMap once and pass the reference as a parameter!"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment