Created
February 23, 2017 11:17
-
-
Save szoio/b80a5c5fb8da00be5a2e5fd822b7895e to your computer and use it in GitHub Desktop.
Event Sourcing with Free Monads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eventsourcing | |
import java.time.Instant | |
import cats._ | |
import cats.data.Coproduct | |
import cats.free.{Free, Inject} | |
import cats.implicits._ | |
import doobie.imports._ | |
import fs2.Stream | |
import io.circe.{Decoder, Encoder, Json} | |
trait FreeOp[F[_], A] { this: F[A] => | |
def liftF: Free[F, A] = Free.liftF(this) | |
def inject[G[_]](implicit I: Inject[F, G]): Free[G, A] = Free.inject(this) | |
} | |
object Queries { | |
final case class DBEvent(id: Int, created: Instant, payload: Json) | |
private[pg] def appendQ[A](event: A)(implicit enc: Encoder[A]) = | |
sql"insert into event(payload) values (${enc(event)})" | |
def append[A: Encoder](event: A): ConnectionIO[DBEvent] = | |
appendQ(event).update.run | |
private[pg] val streamAllQ: Query0[DBEvent] = | |
sql"select id, created, payload from event".query[DBEvent] | |
val streamAll: Stream[ConnectionIO, DBEvent] = | |
streamAllQ.process | |
} | |
trait EventSourcing { | |
import Queries._ | |
// Abstract Types | |
type Event | |
type Algebra[_] | |
// Abstract Fields | |
implicit def encoder: Encoder[Event] | |
implicit def decoder: Decoder[Event] | |
def eventToAlgebra(e: Event): FreeOp[Algebra, _] | |
// EventLogOp algebra for event logging | |
sealed trait EventLogOp[A] extends FreeOp[EventLogOp, A] with Product with Serializable | |
final case class Append(event: Event) extends EventLogOp[Unit] | |
type F[A] = Algebra[A] | |
type EventLogFree[A] = Free[EventLogOp, A] | |
type C[A] = Coproduct[F, EventLogOp, A] | |
type FreeC[A] = Free[C, A] | |
object interpreters { | |
// The EventLog to ConnectionIO interpreter | |
val eventLogOp2ConIO = new (EventLogOp ~> ConnectionIO) { | |
override def apply[A](fa: EventLogOp[A]): ConnectionIO[A] = fa match { | |
case Append(e) => append(e).run.map(_ => ()) | |
} | |
} | |
// The ConnectionIO to a Monad `M` | |
def conIO2M[M[_]](transactor: Transactor[M]): ConnectionIO ~> M = new (ConnectionIO ~> M) { | |
override def apply[A](fa: ConnectionIO[A]): M[A] = transactor.trans(fa) | |
} | |
// `EventLogOp` to a Monad `M` | |
def eventLog2M[M[_]](transactor: Transactor[M]): EventLogOp ~> M = | |
eventLogOp2ConIO.andThen(conIO2M(transactor)) | |
// The interpreter that injects into the coproduct and adds an `Append` instruction | |
private val F2FreeC = new (F ~> FreeC) { | |
override def apply[A](fa: F[A]): FreeC[A] = { | |
fa match { | |
case e: Event => | |
for { | |
_ <- Append(e).inject[C] | |
f <- Free.inject[F, C](fa) | |
} yield f | |
} | |
} | |
} | |
// Any Free algebra to to a Monad `M` with event logging | |
def F2M[M[_]](f2M: F ~> M, el2M: EventLogOp ~> M)(implicit M: Monad[M]): F ~> M = | |
new (F ~> M) { | |
override def apply[A](fa: F[A]): M[A] = { | |
val v = F2FreeC(fa) | |
v.foldMap(f2M or el2M) | |
} | |
} | |
} | |
def playback[M[_]](transactor: Transactor[M], chunkSize: Int)(f2M: F ~> M)( | |
implicit M: Monad[M]): Stream[M, M[Unit]] = { | |
val conIOStream: Stream[ConnectionIO, Free[Algebra, Unit]] = | |
streamAll | |
// convert the Json into an event | |
.map { dbEvent => | |
decoder.decodeJson(dbEvent.payload).toOption | |
} | |
// discard events that fail to decode | |
.collect { case Some(x) => x } | |
// convert the events into operations of type Unit | |
.map { event => | |
eventToAlgebra(event).liftF.map(_ => ()) | |
} | |
val batchedStream: Stream[ConnectionIO, Free[F, List[Unit]]] = | |
conIOStream | |
.chunkLimit(chunkSize) | |
// sequence the chunk to convert to a single Free instance | |
.map(_.toList.sequence[Free[F, ?], Unit]) | |
// transform stream from ConnectionIOs to Ms | |
val fStreamM: Stream[M, Free[F, List[Unit]]] = | |
transactor.transP(batchedStream) | |
// interpret the stream of `Free[F, ?]`s to create a stream of `M`s | |
val mStreamM: Stream[M, M[List[Unit]]] = | |
fStreamM.map(_.foldMap(f2M)) | |
// simplify the return type to `Unit` | |
mStreamM.map(_.map(_ => ())) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For a full description of this pattern read my blog post series on free monads and event sourcing