Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debasishg/7f4fa0a9d5aae65f605fbed23d18a281 to your computer and use it in GitHub Desktop.
Save debasishg/7f4fa0a9d5aae65f605fbed23d18a281 to your computer and use it in GitHub Desktop.
package cqrs
package eventstore
import zio.*
import zio.prelude.*
import zio.prelude.fx.*
object BaseSyntax:
type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A]
type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A]
def pure[S, R, A](a: A): Program[S, R, Nothing, A] =
ZPure.succeed(a)
def unit[S, R]: Program[S, R, Nothing, Unit] =
ZPure.unit
def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] =
ZPure.fail(t)
def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
if cond then unit else raiseError(t)
def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
assertThat(!cond, t)
def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] =
a match
case Some(value) => pure(value)
case None => raiseError(t)
def get[S, R]: Program[S, R, Nothing, S] =
ZPure.get[S]
def set[S, R](s: S): Program[S, R, Nothing, Unit] =
EState.set(s)
def setWithError[S, R, E](s: S): Program[S, R, E, Unit] =
EState.set(s)
def update[S, R](f: S => S): Program[S, R, Nothing, Unit] =
EState.update(f.apply)
def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] =
get.map(f(_))
def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] =
ZPure.service[S, R].map(f(_))
def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] =
ZPure.log(w)
def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] =
ZPure.forEach(previouslog)(log(_)).unit
def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] =
set(previouslog) *> set(previousstate)
package cqrs
package domain
package mastermanagement
import zio.*
import eventstore.*
import Error.*
import Error.ValidationError.*
import Event.*
import Fact.*
import Transport.*
final class Syntax private (transport: Transport):
type Program[A] = BaseSyntax.Program[Instance, Any, Error, A]
type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A]
val get = BaseSyntax.get[Instance, Any]
val set = BaseSyntax.set[Instance, Any]
val update = BaseSyntax.update[Instance, Any]
def setWithError = BaseSyntax.setWithError[Instance, Any, Error]
def inspect[A] = BaseSyntax.inspect[Instance, Any, A]
import BaseSyntax.*
def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] =
for
previous <- inspect(_.master.latest)
_ <- transition(event, previous + 1)
m <- inspect(_.master)
_ <- log(Value(m, event))
yield ()
private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] =
val withsnapshot = for
_ <- set(instance)
weight = raws.foldLeft(0)((s, r) =>
if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize
)
_ = println(s"weight $weight $DefaultSnapshotThreshold")
_ <-
if weight > DefaultSnapshotThreshold then
for
m <- inspect(_.master)
currentindex <- inspect(_.index)
_ <- lift(SnapshotTaken(m.snapshot(currentindex)))
yield ()
else unit
yield ()
withsnapshot.runAll(instance) match
case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance)
case (_, Left(cause)) => ZIO.fail(cause)
extension (program: LoggedProgram[Unit])
def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) =
program.runAll(Instance.empty) match
case (facts, Right(instance, _)) => (facts, Right(instance))
case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause))
def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] =
runFactsEither match
case (facts, Right(instance)) =>
for
raws <- facts.mapZIO(transport.toRawValue(_, tags))
(s, instance) <- snapshot(raws, instance)
withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws)
yield (withsnapshot, instance)
case (_, Left(cause)) => ZIO.fail(cause)
final given instanceTransition: Transition = new Transition
final class Transition:
def apply(event: Event, latest: Int): Program[Unit] =
import Entity.EntityHolder
for
previous <- inspect(_.master.latest)
_ <- event match
case SnapshotTaken(_) => unit
case Created(_) | _ =>
assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}"))
_ <- event match
case Nested(Nested(Nested(nested))) =>
update(i =>
val iter = nested match
case Deleted => i.iteration
case Frozen(index: Int) => i.revision.entities(index - 1).freeze
case AddedProperty(property) => i.iteration.add(property)
case RemovedProperty(property) => i.iteration.remove(property)
case NewEntity(ie: EntityHolder) =>
i.iteration.copy(index = ie.index, workable = ie.workable)
val (r, iternew) = nested match
case NewEntity(_) => (i.revision.add(iter), iter)
case Deleted =>
val r = i.revision.remove(iter)
(r, r.entities.last.unfreeze)
case _ => (i.revision.update(iter), iter)
val v = i.version.update(r)
val m = i.master.update(v)
i.copy(m, v, r, iternew)
)
case Nested(Nested(nested)) =>
update(i =>
val r = nested match
case Deleted => i.revision
case Frozen(index: Int) => i.version.entities(index - 1).freeze
case AddedProperty(property) => i.revision.add(property)
case RemovedProperty(property) => i.revision.remove(property)
case NewEntity(re: EntityHolder) =>
val ie = re.entitystack.last
val iter = i.iteration.copy(index = ie.index, workable = ie.workable)
i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter))
val (v, rnew) = nested match
case NewEntity(_) => (i.version.add(r), r)
case Deleted =>
val v = i.version.remove(r)
(v, v.entities.last.unfreeze)
case _ => (i.version.update(r), r)
val m = i.master.update(v)
nested match
case NewEntity(_) =>
val iter = r.entities.last
i.copy(m, v, r, iter)
case _ => i.copy(m, v, rnew)
)
case Nested(nested) =>
update(i =>
val v = nested match
case Deleted => i.version
case Frozen(index: Int) => i.master.entities(index - 1).freeze
case AddedProperty(property) => i.version.add(property)
case RemovedProperty(property) => i.version.remove(property)
case NewEntity(ve: EntityHolder) =>
val ie = ve.entitystack.head
val re = ve.entitystack.last
val iter = i.iteration.copy(index = ie.index, workable = ie.workable)
val r =
i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter))
i.version.copy(index = ve.index, workable = ve.workable, entities = Vector(r))
val (m, vnew) = nested match
case NewEntity(_) => (i.master.add(v), v)
case Deleted =>
val m = i.master.remove(v)
(m, m.entities.last.unfreeze)
case _ => (i.master.update(v), v)
nested match
case NewEntity(_) =>
val r = v.entities.last
val iter = r.entities.last
i.copy(m, v, r, iter)
case _ => i.copy(m, vnew)
)
case Created(m: Master) =>
update(i =>
val (iv, ir, iiter) = m.currentindex
val v = m.entities(iv - 1)
val r = v.entities(ir - 1)
val iter = r.entities(iiter - 1)
i.copy(m, v, r, iter)
)
case SnapshotTaken(m: Master) =>
update(i =>
val (iv, ir, iiter) = m.currentindex
val v = m.entities(iv - 1)
val r = v.entities(ir - 1)
val iter = r.entities(iiter - 1)
i.copy(m, v, r, iter)
)
case Unfrozen((iv, ir, iiter)) =>
update(i =>
val m = i.master.unfreeze
val v = m.entities(iv - 1).unfreeze
val r = v.entities(ir - 1).unfreeze
val iter = r.entities(iiter - 1).unfreeze
i.copy(m, v, r, iter)
)
case Deleted => update(i => i.copy(i.master.delete, Version.empty, Revision.empty, Iteration.empty))
case AddedProperty(property) => update(i => i.copy(i.master.add(property)))
case RemovedProperty(property) => update(i => i.copy(i.master.remove(property)))
case _ => raiseError(UnhandledEvent(event.toString))
yield ()
def deleteMaster = change(_.master, Deleted)
def deleteVersion = delete(_.master)
def deleteRevision = delete(_.version)
def deleteIteration = delete(_.revision)
def addPropertyMaster(property: Property) = change(_.master, AddedProperty(property))
def addPropertyVersion(property: Property) = change(_.version, AddedProperty(property))
def addPropertyRevision(property: Property) = change(_.revision, AddedProperty(property))
def addPropertyIteration(property: Property) = change(_.iteration, AddedProperty(property))
def removePropertyMaster(property: String) = change(_.master, RemovedProperty(property))
def removePropertyVersion(property: String) = change(_.version, RemovedProperty(property))
def removePropertyRevision(property: String) = change(_.revision, RemovedProperty(property))
def removePropertyIteration(property: String) = change(_.iteration, RemovedProperty(property))
def createMaster(uuid: Uuid) =
val iter = Iteration(Properties.empty, true, 1)
val r = Revision(Properties.empty, true, 1, Vector(iter))
val v = Version(Properties.empty, true, 1, Vector(r))
val m = Master(uuid, 1, Properties.empty, false, true, Vector(v), (1, 1, 1))
lift(Created(m))
def newVersion =
for
i <- get
iter = Entity(index = 1)
r = Entity(index = 1)
v = Entity(index = i.master.nextIndex, Vector(iter, r))
_ <- change(_.iteration, Frozen(i.iteration))
_ <- change(_.revision, Frozen(i.revision))
_ <- change(_.version, Frozen(i.version))
_ <- lift(Nested(NewEntity(v)))
yield ()
def newRevision =
for
i <- get
iter = Entity(index = 1)
r = Entity(index = i.version.nextIndex, Vector(iter))
_ <- change(_.iteration, Frozen(i.iteration))
_ <- change(_.revision, Frozen(i.revision))
_ <- lift(Nested(Nested(NewEntity(r))))
yield ()
def newIteration =
for
i <- get
iter = Entity(index = i.revision.nextIndex)
_ <- change(_.iteration, Frozen(i.iteration))
_ <- lift(Nested(Nested(Nested(NewEntity(iter)))))
yield ()
def selectIndex(index: (Int, Int, Int)) =
def check[A <: Entity[A]](i: Int, entities: Vector[A], m: Master): Program[A] =
for _ <- assertThat(
0 < i && i <= entities.size,
InvalidInput(s"index out of range $index, $i not in [1, ${entities.size}], aggregateroot: $m")
)
yield entities(i - 1)
val (iv, ir, ii) = index
for
m <- inspect(_.master)
v <- check(iv, m.entities, m)
r <- check(ir, v.entities, m)
iter <- check(ii, r.entities, m)
_ <- lift(Unfrozen(index))
yield ()
def selectLatest =
for
m <- inspect(_.master)
v = m.entities.last
r = v.entities.last
iter = r.entities.last
_ <- selectIndex((v.index, r.index, iter.index))
yield ()
private def delete(f: Instance => Aggregate[?, ?]) =
for
a <- inspect(f)
_ <- assertThat(a.workable, ValidationError.NotWorkable(s"Cannot delete entity in $a"))
i <- get
_ <- assertThat(a.entities.size > 1, ValidationError.InvalidState(s"Cannot delete last entity ${i.index} for $a"))
_ <- lift(a match
case _: Revision => Nested(Nested(Nested(Deleted)))
case _: Version => Nested(Nested(Deleted))
case _: Master => Nested(Deleted)
)
yield ()
private def change(f: Instance => Workable[?], event: Event) =
for
w <- inspect(f)
_ <- assertThat(w.workable, ValidationError.NotWorkable(s"$w event $event"))
_ <- lift(w match
case _: Iteration => Nested(Nested(Nested(event)))
case _: Revision => Nested(Nested(event))
case _: Version => Nested(event)
case _: Master => event
)
yield ()
end Syntax
object Syntax:
val layer = ZLayer.fromZIO(makeLayer)
private def makeLayer = for
transport <- ZIO.service[Transport]
result = Syntax(transport)
yield result
// example usage of domain "mastermanagement"
case class Person(lastname: String, firstname: String, age: Int)
given Codec[Person] = deriveCodec[Person]
case class Matrix(
m1: Double,
m2: Double,
m3: Double,
m4: Double,
m5: Double,
m6: Double,
m7: Double,
m8: Double,
m9: Double,
m10: Double,
m11: Double,
m12: Double
)
given Codec[Matrix] = deriveCodec[Matrix]
object Matrix:
final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0)
import Properties.*
import Properties.given
def test(uuid: Uuid): LoggedProgram[Unit] =
val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn
val modify =
for
_ <- addPropertyMaster("name" -> StringElem("this is my name."))
_ <- addPropertyMaster("description" -> StringElem("this is my description."))
_ <- addPropertyVersion("creator" -> StringElem("Bob"))
_ <- addPropertyRevision("size" -> IntElem(4711))
_ <- addPropertyIteration("pi" -> DoubleElem(3.14159))
_ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn))
_ <- addPropertyIteration("largebytes" -> StringElem(largebytes))
_ <- addPropertyIteration("happy" -> BooleanElem(true))
_ <- addPropertyIteration("noidea" -> NullElem)
_ <- addPropertyIteration("dontcare" -> UndefinedElem)
_ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt")))
_ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37)))
_ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity))
_ <- newIteration
_ <- newVersion
_ <- newRevision
_ <- newIteration
_ <- newVersion
_ <- newIteration
_ <- removePropertyIteration("pi")
_ <- addPropertyIteration("e" -> DoubleElem(2.71828))
_ <- removePropertyMaster("name")
_ <- deleteIteration
_ <- selectIndex((2, 1, 1))
_ <- removePropertyIteration("name")
_ <- removePropertyIteration("name")
_ <- newIteration
_ <- addPropertyIteration("name" -> StringElem("this is my new name."))
_ <- newVersion
_ <- selectIndex((1, 1, 2))
_ <- selectLatest
yield ()
for
_ <- createMaster(uuid)
_ <- modify.repeatN(1999)
yield ()
// using Transition to recreate the AggregateRoot
import syntax.*
def readById(uuid: Uuid) =
ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_)))
def aggregate(
facts: RawValueStream
)(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] =
facts
.mapZIO(transport.fromRawValue(_))
.runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment