Skip to content

Instantly share code, notes, and snippets.

@ahjohannessen
Last active August 29, 2015 14:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahjohannessen/70381de6da3bde1c743e to your computer and use it in GitHub Desktop.
Save ahjohannessen/70381de6da3bde1c743e to your computer and use it in GitHub Desktop.
/*
The code below is not intended to be a general purpose approach, but rather a
simple CmdHandler / EventHandler model prototype that illustrates a concept of
how to share an Eventsourced Processor stream among many child actors of
same type.
Snapshotting is important for children in order to get quick start up and avoid
filtering too many sibling events.
Many things that should be handled are not present, such as errors, when to snapshot
and passivation support etc.
Note: Don't take the naming too seriously ;)
*/
object Beam {
trait Msg {
def beamId: String
}
trait Event extends Msg
trait Command extends Msg
trait State
type CmdResult = Either[String, List[Event]]
type CmdHandler[S <: State] = PartialFunction[Command, S ⇒ CmdResult]
type EvtHandler[S <: State] = PartialFunction[Event, S ⇒ S]
case class PersistRequest(uuid: UUID, events: List[Event])
case class PersistResponse(uuid: UUID)
}
trait Beam[S <: State] extends View {
def store: ActorRef
def beamId: String
def pid: String
override val viewId = s"$pid-$beamId"
override val processorId = pid
override val autoUpdate = false
def stateBuilder: S
def evtHandler: EvtHandler[S]
def cmdHandler: CmdHandler[S]
def receive: Receive = {
case cmd: Command if validC(cmd) ⇒ handle(cmd, sender())
case Persistent(e: Event, _) if validE(e) ⇒ apply(e)
case SnapshotOffer(_, snap: S) ⇒ restore(snap)
case m ⇒ // ignore rest for now
}
// Either domain failure or domain events
def handle(cmd: Command, from: ActorRef) = {
cmdHandler(cmd)(state).fold(
fail ⇒ from ! s"fail ⇒ ${cmd.beamId}: $fail",
events ⇒ persist(events) {
apply(events:_*)
from ! s"ack ⇒ ${cmd.beamId}"
}
)
}
// Persisting
def persist(events: List[Event])(doneSignal: ⇒ Unit) = {
if(events.isEmpty) doneSignal else {
val uuid = randomUUID()
def awaitResult: Receive = {
case PersistResponse(`uuid`) ⇒
doneSignal
context.unbecome()
unstashAll()
case _ ⇒ stash()
}
store ! PersistRequest(uuid, events)
context.become(awaitResult)
}
}
// Sanity Checks
def validM(m: Msg) = m.beamId == beamId
def validE(e: Event) : Boolean = validM(e) && evtHandler.isDefinedAt(e)
def validC(c: Command) : Boolean = validM(c) && cmdHandler.isDefinedAt(c)
// Side-effects
private def restore(snapshot: S) : Unit =
state = snapshot
private def apply(es: Event*) : Unit =
state = es.foldLeft(state)((s,e) ⇒ evtHandler(e)(s))
private var state: S = stateBuilder
}
trait Beamer extends EventsourcedProcessor
with ActorContextCreationSupport {
def pid: String
override val processorId = pid
override def preStart() = {
self ! Recover(replayMax = 0L)
}
def receiveCommand = {
case cmd: Command ⇒ dispatch(cmd.beamId) forward cmd
case pr: PersistRequest ⇒ persist(pr.events) { _ ⇒
sender() ! PersistResponse(pr.uuid)
}
}
def receiveRecover = {
case _ ⇒ // no-op
}
private def dispatch(beamId: String) = getOrCreateChild(
props = beamProps(beamId, self),
name = beamId
)
def beamProps(beamId: String, store: ActorRef): Props
}
// Alternatively, just a Sink.
// Beamer would then start, watch etc. sink and hand to children.
trait BeamSink extends EventsourcedProcessor {
def sinkId: String
final override val processorId = sinkId
final override def preStart() =
self ! Recover(replayMax = 0L)
final def receiveCommand = {
case pr: PersistRequest ⇒ persist(pr.events) { _ ⇒
sender() ! PersistResponse(pr.uuid)
}
}
final def receiveRecover = Actor.emptyBehavior
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment