Skip to content

Instantly share code, notes, and snippets.

@notxcain
Created July 10, 2019 06:19
Show Gist options
  • Save notxcain/f79ba6a9d7c43f3bb8d40e93067dc351 to your computer and use it in GitHub Desktop.
Save notxcain/f79ba6a9d7c43f3bb8d40e93067dc351 to your computer and use it in GitHub Desktop.
Projection
package aecor.runtime.akkapersistence.readside
import aecor.data.{ EntityEvent, Fold, Folded }
import aecor.runtime.Eventsourced.Versioned
import cats.MonadError
import cats.implicits._
object Projection {
final case class ProjectionError(message: String) extends RuntimeException(message)
def protect[F[_], S, E](
fold: Fold[Folded, S, E]
)(implicit F: MonadError[F, Throwable]): Fold[F, Versioned[S], Versioned[E]] =
Fold(
Versioned(0, fold.initial), {
case (x @ Versioned(version, s), y @ Versioned(seqNr, e)) =>
if (seqNr <= version) {
x.pure[F]
} else if (seqNr == version + 1) {
fold.reduce(s, e) match {
case Folded.Next(a) => Versioned(version + 1, a).pure[F]
case Folded.Impossible => F.raiseError(ProjectionError(s"Illegal fold $x and $y"))
}
} else {
F.raiseError(ProjectionError("Missing event"))
}
}
)
trait ProjectionStore[F[_], K, S] {
def set(key: K, s: S): F[Unit]
def get(key: K): F[S]
}
def pipe[F[_]: MonadError[?[_], Throwable], K, E, S](store: ProjectionStore[F, K, Versioned[S]],
fold: Fold[Folded, S, E],
): fs2.Pipe[F, EntityEvent[K, E], Unit] = { in =>
val protectedFold = Projection.protect(fold)
in.evalMap {
case EntityEvent(key, sequenceNr, payload) =>
for {
es <- store.get(key)
ns <- protectedFold.reduce(es, Versioned(sequenceNr, payload))
_ <- store.set(key, ns).unlessA(ns == es)
} yield ()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment