Last active
August 13, 2016 08:10
-
-
Save Tvaroh/f1500b036ecb2b721f3e31862fc1b037 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.treev.eventsourcing | |
import cats.data.Xor | |
import enumeratum.{CirceEnum, Enum, EnumEntry} | |
import io.circe.Encoder | |
import io.circe.generic.auto._ | |
import io.treev.eventsourcing.model.Context | |
import io.treev.model._ | |
import monix.eval.Task | |
object eventsourcing { | |
/** Command execution result. */ | |
type CommandResult[F, S] = Task[(F Xor S, List[EmittedEvent[_]])] | |
object CommandResult { | |
def successful[F, S](s: S, events: EmittedEvent[_]*): CommandResult[F, S] = | |
Task.now((Xor.right(s), events.toList)) | |
def failed[F, S](f: F, events: EmittedEvent[_]*): CommandResult[F, S] = | |
Task.now((Xor.left(f), events.toList)) | |
} | |
/** Encodes evidence that a command `C` can emit event `E`. | |
* @tparam C command | |
* @tparam E event */ | |
trait CommandEmitsEvent[C, E] { | |
implicit val ev: CommandEmitsEvent[C, E] = null | |
} | |
object CommandEmitsEvent { | |
def apply[C, E]: CommandEmitsEvent[C, E] = null | |
} | |
/** Encodes evidence that a listener for event `E` can emit event `E2`. | |
* @tparam E listening event | |
* @tparam E2 emitting event */ | |
trait EventListenerEmitsEvent[E, E2] { | |
implicit val ev: EventListenerEmitsEvent[E, E2] = null | |
} | |
object EventListenerEmitsEvent { | |
def apply[E, E2]: EventListenerEmitsEvent[E, E2] = null | |
} | |
/** Wrapper for event emitted from command handler or event listener. | |
* Event can become "emitted" only by means of internal API. | |
* @tparam E underlying event | |
* @see [[io.treev.eventsourcing.model.CommandHandler#emit]] | |
* @see [[io.treev.eventsourcing.model.EventListener#emit]] */ | |
class EmittedEvent[E] private[eventsourcing](_event: E, | |
_handleCallback: Task[Unit] = Task.unit) { | |
/** Execute callback after executing all handlers for this event. | |
* Chaining this method results in multiple callbacks executed potentially concurrently. | |
* @param cb callback | |
* @return emitted event with updated handle callback */ | |
def whenHandled(cb: => Task[Unit]): EmittedEvent[E] = | |
new EmittedEvent[E](_event, Task.zipList(List(handleCallback, cb)).map(_ => ())) | |
private[eventsourcing] def event: E = _event | |
private[eventsourcing] def handleCallback: Task[Unit] = _handleCallback | |
} | |
/** Command handler interface. | |
* @tparam C command | |
* @tparam F failure type | |
* @tparam S success type */ | |
trait CommandHandler[C, F, S] { | |
/** Handle command. | |
* @param command command | |
* @param ctx context | |
* @return command result - success `S` or failure `F` with a list of emitted events */ | |
def handle(command: C) | |
(implicit ctx: Context): CommandResult[F, S] | |
/** Create emitted event. | |
* @param event event to emit */ | |
def emit[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): EmittedEvent[E] = | |
new EmittedEvent(event) | |
/** Create single emitted event. | |
* @param event event to emit */ | |
def emitOne[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): List[EmittedEvent[E]] = | |
List(emit(event)) | |
/** Create zero emitted events. */ | |
def emitNone: Task[List[EmittedEvent[_]]] = CommandHandler.noop | |
} | |
private object CommandHandler { | |
val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil) | |
} | |
/** Event handler interface. | |
* @tparam E event */ | |
trait EventHandler[E] { | |
/** Handler event | |
* @param event event | |
* @return event handler description */ | |
def handle(event: E): Task[Unit] | |
} | |
/** Event listener interface. | |
* @tparam E event */ | |
trait EventListener[E] { | |
/** Event handled callback. | |
* Is executed after an event is handled by an `EventHandler`. | |
* @param event event | |
* @return event handler description holding a list of additional events to emit */ | |
def onEvent(event: E): Task[List[EmittedEvent[_]]] | |
/** Create emitted event. | |
* @param event event to emit */ | |
def emit[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): EmittedEvent[E2] = | |
new EmittedEvent(event) | |
/** Create single emitted event. | |
* @param event event to emit */ | |
def emitOne[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): List[EmittedEvent[E2]] = | |
List(emit(event)) | |
/** Create zero emitted events. */ | |
def emitNone: Task[List[EmittedEvent[_]]] = EventListener.noop | |
} | |
private object EventListener { | |
val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil) | |
} | |
// commands | |
case class CreateSession(username: Username, password: String) | |
object CreateSession { | |
implicit val emitsSessionCreated = CommandEmitsEvent[CreateSession, SessionCreated] | |
implicit val emitsSessionNotCreated = CommandEmitsEvent[CreateSession, SessionNotCreated] | |
} | |
// events | |
case class SessionCreated(userId: UserId, sessionId: SessionId) | |
case class SessionNotCreated(username: Username, | |
password: String, | |
reason: AuthFailureReason, | |
userId: Option[UserId]) | |
object SessionNotCreated { | |
implicit val emitsAuthFailureNotificationSent = | |
EventListenerEmitsEvent[SessionNotCreated, AuthFailureNotificationSent] | |
} | |
case class AuthFailureNotificationSent(userId: UserId) | |
sealed abstract class AuthFailureReason extends EnumEntry { | |
def code: String = entryName | |
} | |
object AuthFailureReason extends CirceEnum[AuthFailureReason] with Enum[AuthFailureReason] { | |
override val values: Seq[AuthFailureReason] = findValues | |
case object WrongUsername extends AuthFailureReason | |
case object WrongPassword extends AuthFailureReason | |
} | |
// responses | |
case class CreateSessionSuccess(userId: UserId, sessionId: SessionId) | |
case class CreateSessionFailure(reason: AuthFailureReason) | |
// command handlers | |
class CreateSessionHandler extends CommandHandler[CreateSession, AuthFailureReason, (UserId, SessionId)] { | |
override def handle(command: CreateSession) | |
(implicit ctx: Context): CommandResult[AuthFailureReason, (UserId, SessionId)] = { | |
import command._ | |
for { | |
result <- AuthApi.authenticate(username, password) | |
} yield ( | |
result.leftMap(_._1), | |
List { | |
result.fold( | |
{ case (reason, userId) => | |
emit(SessionNotCreated(username, password, reason, userId)).whenHandled { | |
ConnectionApi.reply(CreateSessionFailure(reason)) | |
} | |
}, | |
{ case (userId, sessionId) => | |
emit(SessionCreated(userId, sessionId)).whenHandled { | |
ConnectionApi.reply(CreateSessionSuccess(userId, sessionId)) | |
} | |
} | |
) | |
} | |
) | |
} | |
} | |
// event handlers | |
class SessionCreatedHandler extends EventHandler[SessionCreated] { | |
override def handle(event: SessionCreated): Task[Unit] = | |
AuthApi.saveSession(event.userId, event.sessionId) | |
} | |
class AuthFailureNotificationSentHandler extends EventHandler[AuthFailureNotificationSent] { | |
override def handle(event: AuthFailureNotificationSent): Task[Unit] = | |
AuthApi.updateLastFailureNotificationTimestamp(event.userId) | |
} | |
// event listeners | |
class SessionNotCreatedListener extends EventListener[SessionNotCreated] { | |
override def onEvent(event: SessionNotCreated): Task[List[EmittedEvent[_]]] = { | |
event.userId | |
.map { userId => | |
AuthApi.sendFailedAuthenticationEmail(userId) | |
.map(_ => emitOne(AuthFailureNotificationSent(userId))) | |
} | |
.getOrElse(emitNone) | |
} | |
} | |
// apis | |
trait Marshaler[T] | |
object Marshaler { | |
implicit def jsonMarshaler[T: Encoder]: Marshaler[T] = ??? | |
} | |
object ConnectionApi { | |
def reply[R: Marshaler](response: R)(implicit ctx: Context): Task[Unit] = ??? | |
} | |
object AuthApi { | |
def authenticate(username: Username, | |
password: String): Task[(AuthFailureReason, Option[UserId]) Xor (UserId, SessionId)] = ??? | |
def saveSession(userId: UserId, sessionId: SessionId): Task[Unit] = ??? | |
def sendFailedAuthenticationEmail(userId: UserId): Task[Unit] = ??? | |
def updateLastFailureNotificationTimestamp(userId: UserId): Task[Unit] = ??? | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment