Skip to content

Instantly share code, notes, and snippets.

@guidoschmidt17
Last active March 21, 2023 16:29
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save guidoschmidt17/b3fa1ca797d37c2af5a40a85c0bdbeba to your computer and use it in GitHub Desktop.
Save guidoschmidt17/b3fa1ca797d37c2af5a40a85c0bdbeba to your computer and use it in GitHub Desktop.
import zio.*
import zio.prelude.*
import zio.prelude.fx.*
object BaseSyntax:
type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A]
type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A]
def pure[S, R, A](a: A): Program[S, R, Nothing, A] =
ZPure.succeed(a)
def unit[S, R]: Program[S, R, Nothing, Unit] =
ZPure.unit
def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] =
ZPure.fail(t)
def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
if cond then unit else raiseError(t)
def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
assertThat(!cond, t)
def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] =
a match
case Some(value) => pure(value)
case None => raiseError(t)
def get[S, R]: Program[S, R, Nothing, S] =
ZPure.get[S]
def set[S, R](s: S): Program[S, R, Nothing, Unit] =
EState.set(s)
def setWithError[S, R, E](s: S): Program[S, R, E, Unit] =
EState.set(s)
def update[S, R](f: S => S): Program[S, R, Nothing, Unit] =
EState.update(f.apply)
def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] =
get.map(f(_))
def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] =
ZPure.service[S, R].map(f(_))
def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] =
ZPure.log(w)
def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] =
ZPure.forEach(previouslog)(log(_)).unit
def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] =
set(previouslog) *> set(previousstate)
package cqrs
package domain
package mastermanagement
import zio.*
import eventstore.*
import Error.*
import Error.ValidationError.*
import Event.*
import Fact.*
import Transport.*
final class Syntax private (transport: Transport):
type Program[A] = BaseSyntax.Program[Instance, Any, Error, A]
type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A]
val get = BaseSyntax.get[Instance, Any]
val set = BaseSyntax.set[Instance, Any]
val update = BaseSyntax.update[Instance, Any]
def setWithError = BaseSyntax.setWithError[Instance, Any, Error]
def inspect[A] = BaseSyntax.inspect[Instance, Any, A]
import BaseSyntax.*
def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] =
for
previous <- inspect(_.master.latest)
_ <- transition(event, previous + 1)
m <- inspect(_.master)
_ <- log(Value(m, event))
yield ()
private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] =
val withsnapshot = for
_ <- set(instance)
weight = raws.foldLeft(0)((s, r) =>
if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize
)
_ = println(s"weight $weight $DefaultSnapshotThreshold")
_ <-
if weight > DefaultSnapshotThreshold then
for
m <- inspect(_.master)
currentindex <- inspect(_.index)
_ <- lift(SnapshotTaken(m.snapshot(currentindex)))
yield ()
else unit
yield ()
withsnapshot.runAll(instance) match
case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance)
case (_, Left(cause)) => ZIO.fail(cause)
extension (program: LoggedProgram[Unit])
def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) =
program.runAll(Instance.empty) match
case (facts, Right(instance, _)) => (facts, Right(instance))
case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause))
def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] =
runFactsEither match
case (facts, Right(instance)) =>
for
raws <- facts.mapZIO(transport.toRawValue(_, tags))
(s, instance) <- snapshot(raws, instance)
withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws)
yield (withsnapshot, instance)
case (_, Left(cause)) => ZIO.fail(cause)
final given instanceTransition: Transition = new Transition
final class Transition:
def apply(event: Event, latest: Int): Program[Unit] =
import Entity.EntityHolder
for
previous <- inspect(_.master.latest)
_ <- event match
case SnapshotTaken(_) => unit
case Created(_) | _ =>
assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}"))
_ <- event match
case Nested(Nested(Nested(nested))) =>
...
yield ()
...
end Syntax
object Syntax:
val layer = ZLayer.fromZIO(makeLayer)
private def makeLayer = for
transport <- ZIO.service[Transport]
result = Syntax(transport)
yield result
// example usage of domain "mastermanagement"
case class Person(lastname: String, firstname: String, age: Int)
given Codec[Person] = deriveCodec[Person]
case class Matrix(
m1: Double,
m2: Double,
m3: Double,
m4: Double,
m5: Double,
m6: Double,
m7: Double,
m8: Double,
m9: Double,
m10: Double,
m11: Double,
m12: Double
)
given Codec[Matrix] = deriveCodec[Matrix]
object Matrix:
final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0)
import Properties.*
import Properties.given
def test(uuid: Uuid): LoggedProgram[Unit] =
val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn
val modify =
for
_ <- addPropertyMaster("name" -> StringElem("this is my name."))
_ <- addPropertyMaster("description" -> StringElem("this is my description."))
_ <- addPropertyVersion("creator" -> StringElem("Bob"))
_ <- addPropertyRevision("size" -> IntElem(4711))
_ <- addPropertyIteration("pi" -> DoubleElem(3.14159))
_ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn))
_ <- addPropertyIteration("largebytes" -> StringElem(largebytes))
_ <- addPropertyIteration("happy" -> BooleanElem(true))
_ <- addPropertyIteration("noidea" -> NullElem)
_ <- addPropertyIteration("dontcare" -> UndefinedElem)
_ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt")))
_ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37)))
_ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity))
_ <- newIteration
_ <- newVersion
_ <- newRevision
_ <- newIteration
_ <- newVersion
_ <- newIteration
_ <- removePropertyIteration("pi")
_ <- addPropertyIteration("e" -> DoubleElem(2.71828))
_ <- removePropertyMaster("name")
_ <- deleteIteration
_ <- selectIndex((2, 1, 1))
_ <- removePropertyIteration("name")
_ <- removePropertyIteration("name")
_ <- newIteration
_ <- addPropertyIteration("name" -> StringElem("this is my new name."))
_ <- newVersion
_ <- selectIndex((1, 1, 2))
_ <- selectLatest
yield ()
for
_ <- createMaster(uuid)
_ <- modify.repeatN(1999)
yield ()
// using Transition to recreate the AggregateRoot
import syntax.*
def readById(uuid: Uuid) =
ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_)))
def aggregate(
facts: RawValueStream
)(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] =
facts
.mapZIO(transport.fromRawValue(_))
.runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment