Skip to content

Instantly share code, notes, and snippets.

@Tvaroh
Last active August 13, 2016 08:10
Show Gist options
  • Save Tvaroh/f1500b036ecb2b721f3e31862fc1b037 to your computer and use it in GitHub Desktop.
Save Tvaroh/f1500b036ecb2b721f3e31862fc1b037 to your computer and use it in GitHub Desktop.
package io.treev.eventsourcing
import cats.data.Xor
import enumeratum.{CirceEnum, Enum, EnumEntry}
import io.circe.Encoder
import io.circe.generic.auto._
import io.treev.eventsourcing.model.Context
import io.treev.model._
import monix.eval.Task
object eventsourcing {
/** Command execution result. */
type CommandResult[F, S] = Task[(F Xor S, List[EmittedEvent[_]])]
object CommandResult {
def successful[F, S](s: S, events: EmittedEvent[_]*): CommandResult[F, S] =
Task.now((Xor.right(s), events.toList))
def failed[F, S](f: F, events: EmittedEvent[_]*): CommandResult[F, S] =
Task.now((Xor.left(f), events.toList))
}
/** Encodes evidence that a command `C` can emit event `E`.
* @tparam C command
* @tparam E event */
trait CommandEmitsEvent[C, E] {
implicit val ev: CommandEmitsEvent[C, E] = null
}
object CommandEmitsEvent {
def apply[C, E]: CommandEmitsEvent[C, E] = null
}
/** Encodes evidence that a listener for event `E` can emit event `E2`.
* @tparam E listening event
* @tparam E2 emitting event */
trait EventListenerEmitsEvent[E, E2] {
implicit val ev: EventListenerEmitsEvent[E, E2] = null
}
object EventListenerEmitsEvent {
def apply[E, E2]: EventListenerEmitsEvent[E, E2] = null
}
/** Wrapper for event emitted from command handler or event listener.
* Event can become "emitted" only by means of internal API.
* @tparam E underlying event
* @see [[io.treev.eventsourcing.model.CommandHandler#emit]]
* @see [[io.treev.eventsourcing.model.EventListener#emit]] */
class EmittedEvent[E] private[eventsourcing](_event: E,
_handleCallback: Task[Unit] = Task.unit) {
/** Execute callback after executing all handlers for this event.
* Chaining this method results in multiple callbacks executed potentially concurrently.
* @param cb callback
* @return emitted event with updated handle callback */
def whenHandled(cb: => Task[Unit]): EmittedEvent[E] =
new EmittedEvent[E](_event, Task.zipList(List(handleCallback, cb)).map(_ => ()))
private[eventsourcing] def event: E = _event
private[eventsourcing] def handleCallback: Task[Unit] = _handleCallback
}
/** Command handler interface.
* @tparam C command
* @tparam F failure type
* @tparam S success type */
trait CommandHandler[C, F, S] {
/** Handle command.
* @param command command
* @param ctx context
* @return command result - success `S` or failure `F` with a list of emitted events */
def handle(command: C)
(implicit ctx: Context): CommandResult[F, S]
/** Create emitted event.
* @param event event to emit */
def emit[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): EmittedEvent[E] =
new EmittedEvent(event)
/** Create single emitted event.
* @param event event to emit */
def emitOne[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): List[EmittedEvent[E]] =
List(emit(event))
/** Create zero emitted events. */
def emitNone: Task[List[EmittedEvent[_]]] = CommandHandler.noop
}
private object CommandHandler {
val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
}
/** Event handler interface.
* @tparam E event */
trait EventHandler[E] {
/** Handler event
* @param event event
* @return event handler description */
def handle(event: E): Task[Unit]
}
/** Event listener interface.
* @tparam E event */
trait EventListener[E] {
/** Event handled callback.
* Is executed after an event is handled by an `EventHandler`.
* @param event event
* @return event handler description holding a list of additional events to emit */
def onEvent(event: E): Task[List[EmittedEvent[_]]]
/** Create emitted event.
* @param event event to emit */
def emit[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): EmittedEvent[E2] =
new EmittedEvent(event)
/** Create single emitted event.
* @param event event to emit */
def emitOne[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): List[EmittedEvent[E2]] =
List(emit(event))
/** Create zero emitted events. */
def emitNone: Task[List[EmittedEvent[_]]] = EventListener.noop
}
private object EventListener {
val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
}
// commands
case class CreateSession(username: Username, password: String)
object CreateSession {
implicit val emitsSessionCreated = CommandEmitsEvent[CreateSession, SessionCreated]
implicit val emitsSessionNotCreated = CommandEmitsEvent[CreateSession, SessionNotCreated]
}
// events
case class SessionCreated(userId: UserId, sessionId: SessionId)
case class SessionNotCreated(username: Username,
password: String,
reason: AuthFailureReason,
userId: Option[UserId])
object SessionNotCreated {
implicit val emitsAuthFailureNotificationSent =
EventListenerEmitsEvent[SessionNotCreated, AuthFailureNotificationSent]
}
case class AuthFailureNotificationSent(userId: UserId)
sealed abstract class AuthFailureReason extends EnumEntry {
def code: String = entryName
}
object AuthFailureReason extends CirceEnum[AuthFailureReason] with Enum[AuthFailureReason] {
override val values: Seq[AuthFailureReason] = findValues
case object WrongUsername extends AuthFailureReason
case object WrongPassword extends AuthFailureReason
}
// responses
case class CreateSessionSuccess(userId: UserId, sessionId: SessionId)
case class CreateSessionFailure(reason: AuthFailureReason)
// command handlers
class CreateSessionHandler extends CommandHandler[CreateSession, AuthFailureReason, (UserId, SessionId)] {
override def handle(command: CreateSession)
(implicit ctx: Context): CommandResult[AuthFailureReason, (UserId, SessionId)] = {
import command._
for {
result <- AuthApi.authenticate(username, password)
} yield (
result.leftMap(_._1),
List {
result.fold(
{ case (reason, userId) =>
emit(SessionNotCreated(username, password, reason, userId)).whenHandled {
ConnectionApi.reply(CreateSessionFailure(reason))
}
},
{ case (userId, sessionId) =>
emit(SessionCreated(userId, sessionId)).whenHandled {
ConnectionApi.reply(CreateSessionSuccess(userId, sessionId))
}
}
)
}
)
}
}
// event handlers
class SessionCreatedHandler extends EventHandler[SessionCreated] {
override def handle(event: SessionCreated): Task[Unit] =
AuthApi.saveSession(event.userId, event.sessionId)
}
class AuthFailureNotificationSentHandler extends EventHandler[AuthFailureNotificationSent] {
override def handle(event: AuthFailureNotificationSent): Task[Unit] =
AuthApi.updateLastFailureNotificationTimestamp(event.userId)
}
// event listeners
class SessionNotCreatedListener extends EventListener[SessionNotCreated] {
override def onEvent(event: SessionNotCreated): Task[List[EmittedEvent[_]]] = {
event.userId
.map { userId =>
AuthApi.sendFailedAuthenticationEmail(userId)
.map(_ => emitOne(AuthFailureNotificationSent(userId)))
}
.getOrElse(emitNone)
}
}
// apis
trait Marshaler[T]
object Marshaler {
implicit def jsonMarshaler[T: Encoder]: Marshaler[T] = ???
}
object ConnectionApi {
def reply[R: Marshaler](response: R)(implicit ctx: Context): Task[Unit] = ???
}
object AuthApi {
def authenticate(username: Username,
password: String): Task[(AuthFailureReason, Option[UserId]) Xor (UserId, SessionId)] = ???
def saveSession(userId: UserId, sessionId: SessionId): Task[Unit] = ???
def sendFailedAuthenticationEmail(userId: UserId): Task[Unit] = ???
def updateLastFailureNotificationTimestamp(userId: UserId): Task[Unit] = ???
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment