Last active
August 14, 2018 06:43
-
-
Save ndolgov/3a6dc940437c7cd7e5982a25ebf29635 to your computer and use it in GitHub Desktop.
Composing a saga from individual transactions in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface ObjectStoreTx { | |
CompletableFuture<Void> execute(TxContext ctx); | |
Optional<Throwable> rollback(); | |
} | |
public final class CompletableFutureSaga { | |
private CompletableFuture<TxContext> executeHeadTx(Iterator<ObjectStoreTx> txs, TxContext ctx) { | |
if (!txs.hasNext()) { | |
return CompletableFuture.completedFuture(ctx); | |
} | |
final ObjectStoreTx tx = txs.next(); | |
return tx. | |
execute(ctx). | |
exceptionally(e -> { | |
rollback(tx); | |
throw new RuntimeException("Cancelling saga after tx failure"); | |
}). | |
thenCompose(unit -> { | |
return executeHeadTx(txs, ctx); | |
}); | |
} | |
private void rollback(ObjectStoreTx failedTx) { | |
for (int txIndex = transactions.indexOf(failedTx) - 1; txIndex >= 0; txIndex--) { | |
final ObjectStoreTx tx = transactions.get(txIndex); | |
final Optional<Throwable> e = tx.rollback(); | |
if (e.isPresent()) { | |
logger.error("Failed to roll back " + tx, e.get()); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Simiular to ImperativeFutureSaga but somposes Futures idiomatically. */ | |
class FunctionalFuturesSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] { | |
def apply(): Future[TxContext] = executeHeadTx(txs, TxContext()) | |
private def executeHeadTx(txs: Seq[ObjectStoreTx], ctx: TxContext) : Future[TxContext] = { | |
txs match { | |
case Nil => Future.successful(ctx) | |
case tx :: tail => | |
tx. | |
execute(ctx). | |
flatMap {_ => executeHeadTx(tail, ctx)}. | |
recoverWith { case e: Exception => | |
e match { | |
case _: NestedTxException => // the actual failed tx has been logged | |
case _ => logger.error(s"Failed to execute ${tx.toString}") | |
} | |
tx.rollback() match { | |
case Success(_) => | |
case Failure(ue) => logger.error(s"Failed to roll back ${tx.toString}", ue) | |
} | |
Future.failed(new NestedTxException(e)) | |
} | |
} | |
} | |
/** To distinguish between the actual error happening and its propagation through the chain of Futures */ | |
private final class NestedTxException(e: Exception) extends RuntimeException(e) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** A saga transaction whose action is represented by a Future */ | |
trait ObjectStoreTx { | |
def execute(ctx: TxContext): Future[Unit] | |
def rollback(): Try[Unit] | |
} | |
/** A saga comprised of a sequence of ObjectStoreTxs, each to be executed one by one but potentially on different threads. */ | |
class ImperativeFutureSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] { | |
private val txIndex = new AtomicInteger(0) | |
def apply(): Future[TxContext] = { | |
val endOfSagaSignal = Promise[TxContext]() | |
Future { | |
run(txs.head, TxContext(), endOfSagaSignal) | |
} | |
endOfSagaSignal.future | |
} | |
/** | |
* Run a saga tx. If successful, either trigger next tx execution or signal the end of saga. | |
* In case of a failure, trigger rollback of the executed txs. | |
* @param tx the transaction to execute | |
* @param ctx the state to hand off to the next transaction | |
* @param endOfSagaSignal the promise to complete at the end of the saga | |
*/ | |
private def run(tx: ObjectStoreTx, ctx: TxContext, endOfSagaSignal: Promise[TxContext]) : Unit = { | |
tx. | |
execute(ctx). | |
onComplete { | |
case Success(_) => | |
val nextIndex = txIndex.incrementAndGet() | |
if (nextIndex < txs.length) { | |
run(txs(nextIndex), ctx, endOfSagaSignal) | |
} else { | |
endOfSagaSignal.success(ctx) | |
} | |
case Failure(e) => | |
rollback(txIndex.get()) | |
endOfSagaSignal.failure(e) | |
} | |
} | |
/** Revert previously executed stages but applying the corresponding compensation actions (in reverse order) */ | |
private def rollback(failedTxIndex: Int) : Unit = { | |
for (i <- failedTxIndex-1 to 0 by -1) { | |
val tx = txs(i) | |
tx.rollback() match { | |
case Success(_) => | |
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** A saga transaction based on the Continuation monad */ | |
trait ObjectStoreTx[A] { | |
self => | |
type Tx[B, R] = B => Future[R] | |
type Continuation[B, R] = (Tx[B, R]) => Future[R] | |
def run[R]: Continuation[A, R] | |
final def map[B](f: A => B): ObjectStoreTx[B] = new ObjectStoreTx[B] { | |
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f andThen g) } | |
} | |
final def flatMap[B](f: A => ObjectStoreTx[B]): ObjectStoreTx[B] = new ObjectStoreTx[B] { | |
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f(_).run(g)) } | |
} | |
final def toFuture: Future[A] = run { a: A => Future.successful(a) } | |
} | |
/** Helper functions to convert an action represented by a Future into a saga transaction */ | |
trait ObjectStoreTxs { | |
def toRevertableTx[A](action: => Future[A])(rollback: A => Try[Unit])(implicit ec: ExecutionContext): ObjectStoreTx[A] = new ObjectStoreTx[A] { | |
override def run[R]: Continuation[A, R] = (restOfPipeline: Tx[A, R]) => | |
for { | |
a <- action | |
result <- restOfPipeline(a).transform(identity, { ex => rollback(a); ex }) | |
} yield result | |
} | |
} | |
/** Compose three transactions into a saga monadically */ | |
object ObjectStoreSaga { | |
def apply(writeTx: ObjectStoreTx[(String, Long)], objId: ObjectId, catalog: ObjectCatalog, storage: ObjectStorage): Future[(String, Long, String)] = { | |
val saga: ObjectStoreTx[SagaResult] = for { | |
(tmpLocation, fileSize) <- toRevertableTx(writeArrayToTmpLocation(objId, obj, path, storage)) { case (filePath, _) => deleteFile(filePath, storage) } | |
revision <- toRevertableTx(catalog.createRevision(objId, catalog))(rev => catalog.forgetRevision(rev)) | |
permanentPath <- toRevertableTx(renameTmpFile(tmpLocation, storage.persistentPath(objId, revision), storage))(_ => Success(())) | |
} yield (permanentPath, fileSize, revision) | |
saga.toFuture | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** A saga transaction whose action is represented by a Try */ | |
trait ObjectStoreTx { | |
def run(ctx: TxContext): Try[Unit] | |
def rollback(): Try[Unit] | |
} | |
/** A saga comprised of a sequence of ObjectStoreTxs executed on the calling thread */ | |
class TryListSaga(txs: List[ObjectStoreTx]) extends Function0[Try[TxContext]] { | |
def apply(): Try[TxContext] = { | |
val ctx = TxContext() | |
for (txIndex <- txs.indices) { | |
val tx = txs(txIndex) | |
tx.run(ctx) match { | |
case Failure(e) => | |
rollback(txIndex) | |
return Failure(e) | |
case _ => | |
} | |
} | |
Success(ctx) | |
} | |
private def rollback(failedTxIndex: Int) : Unit = { | |
for (i <- failedTxIndex-1 to 0 by -1) { | |
val tx = txs(i) | |
tx.rollback() match { | |
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e) | |
case _ => | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** The state shared among the transactions of a saga */ | |
trait TxContext { | |
def getLong(name: String): Long | |
def setLong(name: String, value: Long): Unit | |
} | |
/** An object id with a revision assigned by the storage */ | |
case class ObjectId(name: String, revision: Option[String]) | |
/** In real life files would be in HDFS */ | |
trait ObjectStorage { | |
def createFile(obj: Array[Byte], path: String): Try[Unit] | |
def deleteFile(path: String): Option[Try[Unit]] | |
} | |
/** In real life it would invoke a DAO to modify the DB */ | |
trait ObjectCatalog { | |
def createRevision(objId: ObjectId): Try[String] | |
def forgetRevision(objId: ObjectId): Try[Unit] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment