Skip to content

Instantly share code, notes, and snippets.

@ivanobulo
Forked from szoio/EventSourcing.scala
Last active March 26, 2018 06:27
Show Gist options
  • Save ivanobulo/41ce9d1d699e3e8d2ec47ccf92fdec56 to your computer and use it in GitHub Desktop.
Save ivanobulo/41ce9d1d699e3e8d2ec47ccf92fdec56 to your computer and use it in GitHub Desktop.
Event Sourcing with Free Monads
package eventsourcing
import java.time.Instant
import cats._
import cats.data.EitherK
import cats.free.{Free, InjectK}
import cats.implicits._
import doobie.imports._
import fs2.Stream
import io.circe.{Decoder, Encoder, Json}
trait FreeOp[F[_], A] { this: F[A] =>
def liftF: Free[F, A] = Free.liftF(this)
def inject[G[_]](implicit I: InjectK[F, G]): Free[G, A] = Free.inject(this)
}
object Queries {
final case class DBEvent(id: Int, created: Instant, payload: Json)
private[pg] def appendQ[A](event: A)(implicit enc: Encoder[A]) =
sql"insert into event(payload) values (${enc(event)})"
def append[A: Encoder](event: A): ConnectionIO[DBEvent] =
appendQ(event).update.run
private[pg] val streamAllQ: Query0[DBEvent] =
sql"select id, created, payload from event".query[DBEvent]
val streamAll: Stream[ConnectionIO, DBEvent] =
streamAllQ.process
}
trait EventSourcing {
import Queries._
// Abstract Types
type Event
type Algebra[_]
// Abstract Fields
implicit def encoder: Encoder[Event]
implicit def decoder: Decoder[Event]
def eventToAlgebra(e: Event): FreeOp[Algebra, _]
// EventLogOp algebra for event logging
sealed trait EventLogOp[A] extends FreeOp[EventLogOp, A] with Product with Serializable
final case class Append(event: Event) extends EventLogOp[Unit]
type F[A] = Algebra[A]
type EventLogFree[A] = Free[EventLogOp, A]
type C[A] = EitherK[F, EventLogOp, A]
type FreeC[A] = Free[C, A]
object interpreters {
// The EventLog to ConnectionIO interpreter
val eventLogOp2ConIO = new (EventLogOp ~> ConnectionIO) {
override def apply[A](fa: EventLogOp[A]): ConnectionIO[A] = fa match {
case Append(e) => append(e).run.map(_ => ())
}
}
// The ConnectionIO to a Monad `M`
def conIO2M[M[_]](transactor: Transactor[M]): ConnectionIO ~> M = new (ConnectionIO ~> M) {
override def apply[A](fa: ConnectionIO[A]): M[A] = transactor.trans(fa)
}
// `EventLogOp` to a Monad `M`
def eventLog2M[M[_]](transactor: Transactor[M]): EventLogOp ~> M =
eventLogOp2ConIO.andThen(conIO2M(transactor))
// The interpreter that injects into the coproduct and adds an `Append` instruction
private val F2FreeC = new (F ~> FreeC) {
override def apply[A](fa: F[A]): FreeC[A] = {
fa match {
case e: Event =>
for {
_ <- Append(e).inject[C]
f <- Free.inject[F, C](fa)
} yield f
}
}
}
// Any Free algebra to to a Monad `M` with event logging
def F2M[M[_]](f2M: F ~> M, el2M: EventLogOp ~> M)(implicit M: Monad[M]): F ~> M =
new (F ~> M) {
override def apply[A](fa: F[A]): M[A] = {
val v = F2FreeC(fa)
v.foldMap(f2M or el2M)
}
}
}
def playback[M[_]](transactor: Transactor[M], chunkSize: Int)(f2M: F ~> M)(
implicit M: Monad[M]): Stream[M, M[Unit]] = {
val conIOStream: Stream[ConnectionIO, Free[Algebra, Unit]] =
streamAll
// convert the Json into an event
.map { dbEvent =>
decoder.decodeJson(dbEvent.payload).toOption
}
// discard events that fail to decode
.collect { case Some(x) => x }
// convert the events into operations of type Unit
.map { event =>
eventToAlgebra(event).liftF.map(_ => ())
}
val batchedStream: Stream[ConnectionIO, Free[F, List[Unit]]] =
conIOStream
.chunkLimit(chunkSize)
// sequence the chunk to convert to a single Free instance
.map(_.toList.sequence[Free[F, ?], Unit])
// transform stream from ConnectionIOs to Ms
val fStreamM: Stream[M, Free[F, List[Unit]]] =
transactor.transP(batchedStream)
// interpret the stream of `Free[F, ?]`s to create a stream of `M`s
val mStreamM: Stream[M, M[List[Unit]]] =
fStreamM.map(_.foldMap(f2M))
// simplify the return type to `Unit`
mStreamM.map(_.map(_ => ()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment