Created
July 10, 2019 06:19
-
-
Save notxcain/f79ba6a9d7c43f3bb8d40e93067dc351 to your computer and use it in GitHub Desktop.
Projection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package aecor.runtime.akkapersistence.readside | |
import aecor.data.{ EntityEvent, Fold, Folded } | |
import aecor.runtime.Eventsourced.Versioned | |
import cats.MonadError | |
import cats.implicits._ | |
object Projection { | |
final case class ProjectionError(message: String) extends RuntimeException(message) | |
def protect[F[_], S, E]( | |
fold: Fold[Folded, S, E] | |
)(implicit F: MonadError[F, Throwable]): Fold[F, Versioned[S], Versioned[E]] = | |
Fold( | |
Versioned(0, fold.initial), { | |
case (x @ Versioned(version, s), y @ Versioned(seqNr, e)) => | |
if (seqNr <= version) { | |
x.pure[F] | |
} else if (seqNr == version + 1) { | |
fold.reduce(s, e) match { | |
case Folded.Next(a) => Versioned(version + 1, a).pure[F] | |
case Folded.Impossible => F.raiseError(ProjectionError(s"Illegal fold $x and $y")) | |
} | |
} else { | |
F.raiseError(ProjectionError("Missing event")) | |
} | |
} | |
) | |
trait ProjectionStore[F[_], K, S] { | |
def set(key: K, s: S): F[Unit] | |
def get(key: K): F[S] | |
} | |
def pipe[F[_]: MonadError[?[_], Throwable], K, E, S](store: ProjectionStore[F, K, Versioned[S]], | |
fold: Fold[Folded, S, E], | |
): fs2.Pipe[F, EntityEvent[K, E], Unit] = { in => | |
val protectedFold = Projection.protect(fold) | |
in.evalMap { | |
case EntityEvent(key, sequenceNr, payload) => | |
for { | |
es <- store.get(key) | |
ns <- protectedFold.reduce(es, Versioned(sequenceNr, payload)) | |
_ <- store.set(key, ns).unlessA(ns == es) | |
} yield () | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment