Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Event Sourcing with Free Monads
package eventsourcing
import java.time.Instant
import cats._
import cats.data.Coproduct
import cats.free.{Free, Inject}
import cats.implicits._
import doobie.imports._
import fs2.Stream
import io.circe.{Decoder, Encoder, Json}
trait FreeOp[F[_], A] { this: F[A] =>
def liftF: Free[F, A] = Free.liftF(this)
def inject[G[_]](implicit I: Inject[F, G]): Free[G, A] = Free.inject(this)
}
object Queries {
final case class DBEvent(id: Int, created: Instant, payload: Json)
private[pg] def appendQ[A](event: A)(implicit enc: Encoder[A]) =
sql"insert into event(payload) values (${enc(event)})"
def append[A: Encoder](event: A): ConnectionIO[DBEvent] =
appendQ(event).update.run
private[pg] val streamAllQ: Query0[DBEvent] =
sql"select id, created, payload from event".query[DBEvent]
val streamAll: Stream[ConnectionIO, DBEvent] =
streamAllQ.process
}
trait EventSourcing {
import Queries._
// Abstract Types
type Event
type Algebra[_]
// Abstract Fields
implicit def encoder: Encoder[Event]
implicit def decoder: Decoder[Event]
def eventToAlgebra(e: Event): FreeOp[Algebra, _]
// EventLogOp algebra for event logging
sealed trait EventLogOp[A] extends FreeOp[EventLogOp, A] with Product with Serializable
final case class Append(event: Event) extends EventLogOp[Unit]
type F[A] = Algebra[A]
type EventLogFree[A] = Free[EventLogOp, A]
type C[A] = Coproduct[F, EventLogOp, A]
type FreeC[A] = Free[C, A]
object interpreters {
// The EventLog to ConnectionIO interpreter
val eventLogOp2ConIO = new (EventLogOp ~> ConnectionIO) {
override def apply[A](fa: EventLogOp[A]): ConnectionIO[A] = fa match {
case Append(e) => append(e).run.map(_ => ())
}
}
// The ConnectionIO to a Monad `M`
def conIO2M[M[_]](transactor: Transactor[M]): ConnectionIO ~> M = new (ConnectionIO ~> M) {
override def apply[A](fa: ConnectionIO[A]): M[A] = transactor.trans(fa)
}
// `EventLogOp` to a Monad `M`
def eventLog2M[M[_]](transactor: Transactor[M]): EventLogOp ~> M =
eventLogOp2ConIO.andThen(conIO2M(transactor))
// The interpreter that injects into the coproduct and adds an `Append` instruction
private val F2FreeC = new (F ~> FreeC) {
override def apply[A](fa: F[A]): FreeC[A] = {
fa match {
case e: Event =>
for {
_ <- Append(e).inject[C]
f <- Free.inject[F, C](fa)
} yield f
}
}
}
// Any Free algebra to to a Monad `M` with event logging
def F2M[M[_]](f2M: F ~> M, el2M: EventLogOp ~> M)(implicit M: Monad[M]): F ~> M =
new (F ~> M) {
override def apply[A](fa: F[A]): M[A] = {
val v = F2FreeC(fa)
v.foldMap(f2M or el2M)
}
}
}
def playback[M[_]](transactor: Transactor[M], chunkSize: Int)(f2M: F ~> M)(
implicit M: Monad[M]): Stream[M, M[Unit]] = {
val conIOStream: Stream[ConnectionIO, Free[Algebra, Unit]] =
streamAll
// convert the Json into an event
.map { dbEvent =>
decoder.decodeJson(dbEvent.payload).toOption
}
// discard events that fail to decode
.collect { case Some(x) => x }
// convert the events into operations of type Unit
.map { event =>
eventToAlgebra(event).liftF.map(_ => ())
}
val batchedStream: Stream[ConnectionIO, Free[F, List[Unit]]] =
conIOStream
.chunkLimit(chunkSize)
// sequence the chunk to convert to a single Free instance
.map(_.toList.sequence[Free[F, ?], Unit])
// transform stream from ConnectionIOs to Ms
val fStreamM: Stream[M, Free[F, List[Unit]]] =
transactor.transP(batchedStream)
// interpret the stream of `Free[F, ?]`s to create a stream of `M`s
val mStreamM: Stream[M, M[List[Unit]]] =
fStreamM.map(_.foldMap(f2M))
// simplify the return type to `Unit`
mStreamM.map(_.map(_ => ()))
}
}
@szoio

This comment has been minimized.

Copy link
Owner Author

szoio commented Apr 3, 2017

For a full description of this pattern read my blog post series on free monads and event sourcing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.