Skip to content

Instantly share code, notes, and snippets.

@oxbowlakes
Last active October 8, 2018 15:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oxbowlakes/a5afe61fc301e4db22216ecb325b3a6b to your computer and use it in GitHub Desktop.
Save oxbowlakes/a5afe61fc301e4db22216ecb325b3a6b to your computer and use it in GitHub Desktop.

Description of Program

I have a program which looks roughly like this

  1. Waits for some files
  2. When they arrive, parses them to extract data
  3. Reads database state
  4. Performs calculation
  5. Saves new state to database
  6. Generates emails on the basis of computed data
  7. Repeat

It's critical that steps 3 & 5 runs within a single database transaction on each iteration, so it's currently a program written in Kleisli[IO, Connection, ?] thus:

// External Input
val waitForFiles:                IO[my.RawInput]       = ???
val parseFiles:   my.RawInput => IO[my.ProcessedInput] = ???

// Database interaction
val readPersistedState:               Kleisli[IO, Connection, my.PersistedState] = ???
val writePersistedState: my.Output => Kleisli[IO, Connection, Unit]              = ???

//Computation
val compute: (my.ProcessedInput, my.PersistedState) => my.Output = ???

//single run
for {
  f <- waitForFiles.liftKleisli[Connection]   //step 1
  g <- parseFiles(f).liftKleisli[Connection]  //setp 2
  h <- readPersistedState                     //step 3
  i =  compute(g, h)                          //setp 4
  _ <- writePersistedState(i)                 //step 5
  _ <- sendEmail(i).liftKleisli[Connection]   //step 6
}
yield ()

We generate one big singleRun: Kleisli[IO, Connection, Unit] and then run it using a library method of ours:

def runTransaction[M[_] : Monad : Catchable, A](ds: DataSource, f: Connection => M[A], isolation: Option[Isolation] = None): M[A]

That is:

val db: IO[DataSource] = ???
val singleRun: Kleisli[IO, Connection, Unit] = /* as above */
IO.ioMonad.forever { //step 7 (spin here)
  db
    .flatMap { ds => 
      runTransaction[IO, Unit](ds, singleRun.run, Some(Serialized))
    }
}

Motivation to use zio.IO

I don't wish a failed email to bail out my whole system and neither do I want the next calculation to wait on the email succeeding. I want to do sth like this:

for {
  q <- Queue.unbounded[my.Output]
 
  x <- waitForFiles
         .flatMap { f =>                          //step 1
           parseFiles(f)                          //step 2
         }
         .flatMap { g => 
           readPersistentState                    //step 3
             .map(compute(g, _))                  //step 4
             .flatMap { i =>      
               writePersistentState(i)            //step 5 
                 .flatMap(_ => q.offer(i)) 
             }
         }
         .forever                                 //step 7 (spin here) 
         .fork               
 
  y <- q
         .take
         .flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
         .forever
         .fork
 
  _ <- x.join
  _ <- y.join
}
yield ExitStatus.ExitNow(0)

Where I'm stuck

However, I'm not clear on the best way to turn steps 3 to 5, which must happen in a database transaction ...

readPersistentState
  .map(compute(g, _))
  .flatMap { i => 
    writePersistentState(i).flatMap(_ => q.offer(i)) 
  }

... into a zio.IO whilst creating, threading, committing (or aborting) that transaction. You can assume I can get my hands on an IO[DataSource] in there.

What's the best way of doing this?

  • I assume I'm going to have to write a ZIO version of runTransactionMonad

  • Given that ZIO doesn't ship with typeclass instances for zio.IO I presume I shouldn't use Kleisli[zio.IO[E, ?], Connection, A]. How then to compose readPersistentState and writePersistentState together? That is, given

    val a: Connection => zio.IO[Throwable, H] = ???
    val f: H => I = ???
    val b: I => Connection => zio.IO[Throwable, Unit] = ???
    

Is there no simpler way than:

val c: Connection => zio.IO[Throwable, I] = 
  conn => {
    a(conn).flatMap { h =>
      val i = f(h)
      b(i)(conn).map(_ => i)
    }
  }

Potential solution (which is ugly)

Let's say that I have these:

val readPersistentState:               Connection => zio.IO[Throwable, my.PersistedState] = ???
val writePersistentState: my.Output => Connection => zio.IO[Throwable, Unit]              = ???

def runTransactionMonad[A](
  ds: DataSource, 
  f:  Connection => zio.IO[Throwable, A], 
  isolation: Option[Isolation]
): zio.IO[Throwable, A] = ???

Then my program is:

 for {
  ds <- IO.syncThrowable(unsafeDataSourceLookup)
 
  q <- Queue.unbounded[my.Output]
 
  x <- waitForFiles
         .flatMap { f =>                                   //step 1
           parseFiles(f)                                   //step 2
         }
         .flatMap { g => 
           runTransactionMonad(
             ds,
             conn => 
               readPersistentState(conn).flatMap { h =>     //step 3
                 val i = compute(g, h)                      //step 4
                 writePersistentState(i)(conn).map(_ => i)  //step 5
               }
           
           )
         }
         .flatMap { i =>
           q.offer(i)
         }  
         .forever                                 //step 7 (spin here) 
         .fork               
 
  y <- q
         .take
         .flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
         .forever
         .fork
 
  _ <- x.join
  _ <- y.join
}
yield ExitStatus.ExitNow(0)
@iravid
Copy link

iravid commented Oct 8, 2018

Hi Chris, here's a (simplified, untested) snippet that demonstrates how it would look with FS2:

Queue.unbounded(my.Output).flatMap { q =>
  val parser = Stream
    .repeatEval(waitOnFiles)
    .evalMap(parseFiles)
    .evalMap(runTransactionMonad(...))
    .evalTap(q.offer)
    .drain

  val sender = Stream
    .repeatEval(q.take)
    .evalMap(attemptSendEmailUntilSuccceeded)
    .drain
  
   parser.parJoin(sender).compile.drain

I'm not sure where the cats-effect instances stand for ZIO, but if they exist, you could just use FS2 with ZIO as the effect monad.

@oxbowlakes
Copy link
Author

runTransactionMonad(...)

Thanks, but I think you are answering a question that I am not asking. I want to understand how I can get Kleisli composition interacting with the effect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment