Skip to content

Instantly share code, notes, and snippets.

@ndolgov
Last active August 14, 2018 06:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ndolgov/3a6dc940437c7cd7e5982a25ebf29635 to your computer and use it in GitHub Desktop.
Save ndolgov/3a6dc940437c7cd7e5982a25ebf29635 to your computer and use it in GitHub Desktop.
Composing a saga from individual transactions in Scala
interface ObjectStoreTx {
CompletableFuture<Void> execute(TxContext ctx);
Optional<Throwable> rollback();
}
public final class CompletableFutureSaga {
private CompletableFuture<TxContext> executeHeadTx(Iterator<ObjectStoreTx> txs, TxContext ctx) {
if (!txs.hasNext()) {
return CompletableFuture.completedFuture(ctx);
}
final ObjectStoreTx tx = txs.next();
return tx.
execute(ctx).
exceptionally(e -> {
rollback(tx);
throw new RuntimeException("Cancelling saga after tx failure");
}).
thenCompose(unit -> {
return executeHeadTx(txs, ctx);
});
}
private void rollback(ObjectStoreTx failedTx) {
for (int txIndex = transactions.indexOf(failedTx) - 1; txIndex >= 0; txIndex--) {
final ObjectStoreTx tx = transactions.get(txIndex);
final Optional<Throwable> e = tx.rollback();
if (e.isPresent()) {
logger.error("Failed to roll back " + tx, e.get());
}
}
}
}
/** Simiular to ImperativeFutureSaga but somposes Futures idiomatically. */
class FunctionalFuturesSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] {
def apply(): Future[TxContext] = executeHeadTx(txs, TxContext())
private def executeHeadTx(txs: Seq[ObjectStoreTx], ctx: TxContext) : Future[TxContext] = {
txs match {
case Nil => Future.successful(ctx)
case tx :: tail =>
tx.
execute(ctx).
flatMap {_ => executeHeadTx(tail, ctx)}.
recoverWith { case e: Exception =>
e match {
case _: NestedTxException => // the actual failed tx has been logged
case _ => logger.error(s"Failed to execute ${tx.toString}")
}
tx.rollback() match {
case Success(_) =>
case Failure(ue) => logger.error(s"Failed to roll back ${tx.toString}", ue)
}
Future.failed(new NestedTxException(e))
}
}
}
/** To distinguish between the actual error happening and its propagation through the chain of Futures */
private final class NestedTxException(e: Exception) extends RuntimeException(e)
}
/** A saga transaction whose action is represented by a Future */
trait ObjectStoreTx {
def execute(ctx: TxContext): Future[Unit]
def rollback(): Try[Unit]
}
/** A saga comprised of a sequence of ObjectStoreTxs, each to be executed one by one but potentially on different threads. */
class ImperativeFutureSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] {
private val txIndex = new AtomicInteger(0)
def apply(): Future[TxContext] = {
val endOfSagaSignal = Promise[TxContext]()
Future {
run(txs.head, TxContext(), endOfSagaSignal)
}
endOfSagaSignal.future
}
/**
* Run a saga tx. If successful, either trigger next tx execution or signal the end of saga.
* In case of a failure, trigger rollback of the executed txs.
* @param tx the transaction to execute
* @param ctx the state to hand off to the next transaction
* @param endOfSagaSignal the promise to complete at the end of the saga
*/
private def run(tx: ObjectStoreTx, ctx: TxContext, endOfSagaSignal: Promise[TxContext]) : Unit = {
tx.
execute(ctx).
onComplete {
case Success(_) =>
val nextIndex = txIndex.incrementAndGet()
if (nextIndex < txs.length) {
run(txs(nextIndex), ctx, endOfSagaSignal)
} else {
endOfSagaSignal.success(ctx)
}
case Failure(e) =>
rollback(txIndex.get())
endOfSagaSignal.failure(e)
}
}
/** Revert previously executed stages but applying the corresponding compensation actions (in reverse order) */
private def rollback(failedTxIndex: Int) : Unit = {
for (i <- failedTxIndex-1 to 0 by -1) {
val tx = txs(i)
tx.rollback() match {
case Success(_) =>
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e)
}
}
}
}
/** A saga transaction based on the Continuation monad */
trait ObjectStoreTx[A] {
self =>
type Tx[B, R] = B => Future[R]
type Continuation[B, R] = (Tx[B, R]) => Future[R]
def run[R]: Continuation[A, R]
final def map[B](f: A => B): ObjectStoreTx[B] = new ObjectStoreTx[B] {
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f andThen g) }
}
final def flatMap[B](f: A => ObjectStoreTx[B]): ObjectStoreTx[B] = new ObjectStoreTx[B] {
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f(_).run(g)) }
}
final def toFuture: Future[A] = run { a: A => Future.successful(a) }
}
/** Helper functions to convert an action represented by a Future into a saga transaction */
trait ObjectStoreTxs {
def toRevertableTx[A](action: => Future[A])(rollback: A => Try[Unit])(implicit ec: ExecutionContext): ObjectStoreTx[A] = new ObjectStoreTx[A] {
override def run[R]: Continuation[A, R] = (restOfPipeline: Tx[A, R]) =>
for {
a <- action
result <- restOfPipeline(a).transform(identity, { ex => rollback(a); ex })
} yield result
}
}
/** Compose three transactions into a saga monadically */
object ObjectStoreSaga {
def apply(writeTx: ObjectStoreTx[(String, Long)], objId: ObjectId, catalog: ObjectCatalog, storage: ObjectStorage): Future[(String, Long, String)] = {
val saga: ObjectStoreTx[SagaResult] = for {
(tmpLocation, fileSize) <- toRevertableTx(writeArrayToTmpLocation(objId, obj, path, storage)) { case (filePath, _) => deleteFile(filePath, storage) }
revision <- toRevertableTx(catalog.createRevision(objId, catalog))(rev => catalog.forgetRevision(rev))
permanentPath <- toRevertableTx(renameTmpFile(tmpLocation, storage.persistentPath(objId, revision), storage))(_ => Success(()))
} yield (permanentPath, fileSize, revision)
saga.toFuture
}
}
/** A saga transaction whose action is represented by a Try */
trait ObjectStoreTx {
def run(ctx: TxContext): Try[Unit]
def rollback(): Try[Unit]
}
/** A saga comprised of a sequence of ObjectStoreTxs executed on the calling thread */
class TryListSaga(txs: List[ObjectStoreTx]) extends Function0[Try[TxContext]] {
def apply(): Try[TxContext] = {
val ctx = TxContext()
for (txIndex <- txs.indices) {
val tx = txs(txIndex)
tx.run(ctx) match {
case Failure(e) =>
rollback(txIndex)
return Failure(e)
case _ =>
}
}
Success(ctx)
}
private def rollback(failedTxIndex: Int) : Unit = {
for (i <- failedTxIndex-1 to 0 by -1) {
val tx = txs(i)
tx.rollback() match {
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e)
case _ =>
}
}
}
}
/** The state shared among the transactions of a saga */
trait TxContext {
def getLong(name: String): Long
def setLong(name: String, value: Long): Unit
}
/** An object id with a revision assigned by the storage */
case class ObjectId(name: String, revision: Option[String])
/** In real life files would be in HDFS */
trait ObjectStorage {
def createFile(obj: Array[Byte], path: String): Try[Unit]
def deleteFile(path: String): Option[Try[Unit]]
}
/** In real life it would invoke a DAO to modify the DB */
trait ObjectCatalog {
def createRevision(objId: ObjectId): Try[String]
def forgetRevision(objId: ObjectId): Try[Unit]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment