Created
October 10, 2018 11:37
-
-
Save oxbowlakes/25e58e8f434e2473595cb4828adffcd1 to your computer and use it in GitHub Desktop.
Using scalaz ZIO to provide a transactional database commit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.sql.{Connection, SQLException} | |
import javax.sql.DataSource | |
import scalaz.zio.{IO, KleisliIO} | |
object Transactions { | |
sealed abstract class Isolation(private[Transactions] val i: Int) | |
object Isolation { | |
case object Serialized | |
extends Isolation(Connection.TRANSACTION_SERIALIZABLE) | |
case object None extends Isolation(Connection.TRANSACTION_NONE) | |
case object ReadCommitted | |
extends Isolation(Connection.TRANSACTION_READ_COMMITTED) | |
case object ReadUncommitted | |
extends Isolation(Connection.TRANSACTION_READ_UNCOMMITTED) | |
case object RepeatableRead | |
extends Isolation(Connection.TRANSACTION_REPEATABLE_READ) | |
} | |
def runTransaction[E, A]( | |
ds: DataSource, | |
f: KleisliIO[E, Connection, A], | |
log: String => IO[Nothing, Unit], | |
i: Isolation = Isolation.Serialized | |
): IO[E, A] = { | |
val acquire = IO.sync(ds.getConnection) | |
val release = (conn: Connection) => IO.sync(conn.close()) | |
IO.bracket[E, Connection, A](acquire)(release) { conn => | |
/* Action to set autoCommit status */ | |
val setAutoCommit = | |
if (i == Isolation.None) IO.unit else IO.sync(conn.setAutoCommit(false)) | |
/* Action to set isolation status */ | |
val setTransactionIsolation = IO.sync(conn.setTransactionIsolation(i.i)) | |
/* Action to commit the transaction */ | |
val commit = | |
if (i == Isolation.None) IO.unit | |
else { | |
log(s"Committing transaction $conn ...") *> IO.sync(conn.commit()) | |
} | |
/* Action to rollback the transaction */ | |
val rollback = | |
if (i == Isolation.None) IO.unit | |
else { | |
log(s"Rolling back transaction $conn ...") *> IO.sync(conn.rollback()) | |
} | |
for { | |
/* getAndSet connection state */ | |
oldIsolation <- IO.sync(conn.getTransactionIsolation) <* setTransactionIsolation | |
oldAutoCommit <- IO.sync(conn.getAutoCommit) <* setAutoCommit | |
/* The reset action (which can fail) */ | |
reset = IO.syncCatch({ | |
conn.setAutoCommit(oldAutoCommit) | |
conn.setTransactionIsolation(oldIsolation) | |
}) { | |
case e: SQLException => e | |
} | |
/* Run and commit the transaction, rolling back on | |
error and attempting to place it in its original state */ | |
a <- (f.run(conn) <* commit) | |
.onError(r => | |
log(s"Transaction failed on $conn with $r") *> rollback.attempt | |
.flatMap { | |
case Left(t) => log(s"Failure during rollback of $conn: $t") | |
case Right(_) => IO.unit | |
}) | |
.ensuring(reset.attempt.flatMap { | |
case Left(t) => | |
log(s"Unable to reset state of transaction $conn: $t") | |
case Right(_) => IO.unit | |
}) | |
} yield a | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There's a lot you could do using the
KleisliIO
combinators here, e.g.Perhaps you want to expose your errors in the form of an
NonEmptyList
of a custom ADT, so that you can get rid oflog
, it doesn't really feel at home there?