Skip to content

Instantly share code, notes, and snippets.

@crakjie
Created October 14, 2019 16:23
Show Gist options
  • Save crakjie/d8389212500bb18c3995162b5d853bc5 to your computer and use it in GitHub Desktop.
Save crakjie/d8389212500bb18c3995162b5d853bc5 to your computer and use it in GitHub Desktop.
cats effect transaction keeper
import cats._
import implicits._
import cats.effect._
import concurrent._
import cats.effect.implicits._
/**
* The transactioner help to make transaction by unsuring the transaction is open before runing task and task are ended before close
* @tparam F
*/
trait Transactioner[F[_]] {
/**
* Open a transaction
* @return Sucess if the transaction is not opened fail if it's already opened
*/
def open: F[Unit]
/**
* start a task
* @return Sucess if the transaction is open fail if it's already close or not opened
*/
def task[A](t: F[A]): F[A]
/**
* signify the end the transaction, sementicaly wait the end of all task and close the transaction
* @return Sucess is the transaction open fail if it's already close or not opened
*/
def close: F[Unit]
}
object Transactioner {
def create[F[_]](implicit F: Concurrent[F]): F[Transactioner[F]] = {
sealed trait State
case class Closing(cpt: Int, d: Deferred[F, Unit]) extends State
case object NotOpened extends State
case object Closed extends State
case class Opened(cpt: Int) extends State
Ref.of[F, State](NotOpened).map { state =>
new Transactioner[F] {
override def open: F[Unit] =
state.modify {
case NotOpened => Opened(0) -> F.unit
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is already closed"))
case st: Opened => st -> F.raiseError[Unit](new IllegalStateException("Transaction is open"))
case st: Closing => st -> F.raiseError[Unit](new IllegalStateException("Transaction is open"))
}.flatten
override def task[A](t: F[A]): F[A] = {
for {
_ <- startTask
a <- t
_ <- endTask
} yield a
}
private def startTask: F[Unit] =
state.modify {
case Opened(cpt) => Opened(cpt + 1) -> F.unit
case NotOpened => NotOpened -> F.raiseError[Unit](new IllegalStateException("Transaction is not opened"))
case Closing(cpt, df) =>
Closing(cpt, df) -> F.raiseError[Unit](new IllegalStateException("Transaction is closed"))
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is closed"))
}.flatten
private def endTask: F[Unit] =
state.modify {
case Opened(cpt) => Opened(cpt - 1) -> F.unit
case Closing(1, df) => Closing(0, df) -> df.complete(())
case Closing(cpt, df) => Closing(cpt - 1, df) -> F.unit
case NotOpened => NotOpened -> F.raiseError[Unit](new IllegalStateException("Transaction is no opended"))
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is closed"))
}.flatten
override def close: F[Unit] = Deferred[F, Unit].flatMap { newValue =>
state.modify {
case Opened(0) =>
Closed -> F.unit
case Opened(cpt) =>
Closing(cpt, newValue) -> newValue.get.onCancel {
//On cancel if it's still closing go back to the open status.
state.update {
case Closing(cpt, _) => Opened(cpt)
case st => st
}
}
case st: Closing =>
st -> F.raiseError[Unit](new IllegalStateException("Transaction is already closing"))
case st @ Closed =>
st -> F.raiseError[Unit](new IllegalStateException("Transaction is closed"))
case st @ NotOpened =>
st -> F.raiseError[Unit](new IllegalStateException("Transaction is not opened"))
}.flatten
}
}
}
}
}
import monix.eval.Task
import org.scalatest.{AsyncFlatSpec, FlatSpec, Matchers}
import monix.execution.Scheduler.Implicits.global
import scala.language.postfixOps
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
class TransactionerTest extends FlatSpec with Matchers {
it should "not allow close on not open transaction" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.close
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "not allow open transaction twice" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
_ <- t.open
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "not allow open on closed transaction" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
_ <- t.close
_ <- t.open
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "not allow close transaction twice" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
_ <- t.close
_ <- t.close
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "not allow new task on not open transaction" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.task(Task.unit)
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "not allow new task on closed transaction" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
_ <- t.close
_ <- t.task(Task.unit)
} yield {}
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
it should "allow new task on while transaction is not closed" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
a <- t.task(Task(1))
b <- t.task(Task(2))
_ <- t.close
} yield { a + b }
Await.result(task.runToFuture, Duration.Inf) shouldBe 3
}
it should "allow to call close before the end of tasks" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
a <- t.task(Task(1).delayResult(1 seconds)).asyncBoundary
b <- t.task(Task(2)).asyncBoundary
_ <- t.close
} yield { a + b }
Await.result(task.runToFuture, Duration.Inf) shouldBe 3
}
it should "allow to call close before the end of tasks second way" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
a <- Task.parMap2(
// the close came 500 millis after the start task
t.close.delayExecution(500 millis),
//start task is call imediatly and end task after a second
t.task(Task(1).delayResult(1 seconds)).asyncBoundary
)((_, b) => b)
} yield { a }
Await.result(task.runToFuture, Duration.Inf) shouldBe 1
}
it should "not allow new task on closed transaction even in concurent case" in {
val task = for {
t <- Transactioner.create[Task]
_ <- t.open
a <- Task.parMap2(t.close.delayResult(500 millis),
Task.shift.delayExecution(1 seconds).flatMap(_ => t.task(Task(1))))((_, b) => b)
} yield { a }
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment