Skip to content

Instantly share code, notes, and snippets.

@yuk1ty
Created July 1, 2020 17:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuk1ty/96ab766d6460524c4903a9219c0cc939 to your computer and use it in GitHub Desktop.
Save yuk1ty/96ab766d6460524c4903a9219c0cc939 to your computer and use it in GitHub Desktop.
Saga implementation for com.twitter.util.Future
package jp.airtrack.bid.util.saga
import com.twitter.util.Future
import jp.airtrack.bid.util.saga.Saga.SagaErr
sealed abstract class Saga[A] {
def map[B](f: A => B): Saga[B] = flatMap(a => Saga.Succeeded(f(a)))
def flatMap[B](f: A => Saga[B]): Saga[B] = Saga.FlatMap(this, (a: A) => f(a))
def flatten[B](implicit ev: A <:< Saga[B]): Saga[B] = flatMap(ev)
def transact: Future[A] = {
def interpret[X](saga: Saga[X]): Future[(X, Future[Unit])] = saga match {
case Saga.Succeeded(value) => Future.value((value, Future.Unit))
case s: Saga.Step[X, Throwable] => ??? // TODO
case Saga.FlatMap(chained: Saga[Any], continuation: (Any => Saga[X])) =>
interpret(chained).flatMap {
case (v, prevStepCompensator) => ??? // TODO
}
// TODO Par pattern
}
interpret(this).map(_._1).rescue {
case e: SagaErr => e.compensator.flatMap(_ => Future.exception(e))
}
}
// private def race[B](fa: Future[A], fb: Future[B]): Future[Unit] = fa.select(fb).unit
}
object Saga {
private case class Succeeded[A](value: A) extends Saga[A]
private case class Step[A, E >: Throwable](action: Future[A], compensate: Either[E, A] => Future[Unit])
extends Saga[A]
private case class FlatMap[A, B](fa: Saga[A], f: A => Saga[B]) extends Saga[B]
// private class Par[A, B, C](fa: Saga[A],
// fb: Saga[B],
// combine: (A, B) => C,
// compensate: (Future[Unit], Future[Unit]) => Future[Unit])
// extends Saga[C]
private case class SagaErr(cause: Throwable, compensator: Future[Unit]) extends Throwable(cause)
def compensate[A](comp: Future[A], compensation: Future[Unit]): Saga[A] =
compensate(comp, (_: Either[_, _]) => compensation)
def compensate[E <: Throwable, A](comp: Future[A], compensation: Either[E, A] => Future[Unit]): Saga[A] =
Step(comp, compensation)
def compensateIfSuccess[A](request: Future[A], compensation: A => Future[Unit]): Saga[A] =
compensate(request, (result: Either[Throwable, A]) => result.fold(_ => Future.Unit, compensation))
def compensateIfFailure[E <: Throwable, A](request: Future[A], compensation: E => Future[Unit]): Saga[A] =
compensate[E, A](request, (result: Either[E, A]) => result.fold(compensation, _ => Future.Unit))
def succeed[A](value: A): Saga[A] = Succeeded(Future.value(value))
def fail[A](err: Throwable): Saga[A] = noCompensate(Future.exception(err))
def noCompensate[A](comp: Future[A]): Saga[A] = Step(comp, (_: Either[Throwable, A]) => Future.Unit)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment