Last active
August 29, 2015 14:02
-
-
Save ahjohannessen/70381de6da3bde1c743e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
The code below is not intended to be a general purpose approach, but rather a | |
simple CmdHandler / EventHandler model prototype that illustrates a concept of | |
how to share an Eventsourced Processor stream among many child actors of | |
same type. | |
Snapshotting is important for children in order to get quick start up and avoid | |
filtering too many sibling events. | |
Many things that should be handled are not present, such as errors, when to snapshot | |
and passivation support etc. | |
Note: Don't take the naming too seriously ;) | |
*/ | |
object Beam { | |
trait Msg { | |
def beamId: String | |
} | |
trait Event extends Msg | |
trait Command extends Msg | |
trait State | |
type CmdResult = Either[String, List[Event]] | |
type CmdHandler[S <: State] = PartialFunction[Command, S ⇒ CmdResult] | |
type EvtHandler[S <: State] = PartialFunction[Event, S ⇒ S] | |
case class PersistRequest(uuid: UUID, events: List[Event]) | |
case class PersistResponse(uuid: UUID) | |
} | |
trait Beam[S <: State] extends View { | |
def store: ActorRef | |
def beamId: String | |
def pid: String | |
override val viewId = s"$pid-$beamId" | |
override val processorId = pid | |
override val autoUpdate = false | |
def stateBuilder: S | |
def evtHandler: EvtHandler[S] | |
def cmdHandler: CmdHandler[S] | |
def receive: Receive = { | |
case cmd: Command if validC(cmd) ⇒ handle(cmd, sender()) | |
case Persistent(e: Event, _) if validE(e) ⇒ apply(e) | |
case SnapshotOffer(_, snap: S) ⇒ restore(snap) | |
case m ⇒ // ignore rest for now | |
} | |
// Either domain failure or domain events | |
def handle(cmd: Command, from: ActorRef) = { | |
cmdHandler(cmd)(state).fold( | |
fail ⇒ from ! s"fail ⇒ ${cmd.beamId}: $fail", | |
events ⇒ persist(events) { | |
apply(events:_*) | |
from ! s"ack ⇒ ${cmd.beamId}" | |
} | |
) | |
} | |
// Persisting | |
def persist(events: List[Event])(doneSignal: ⇒ Unit) = { | |
if(events.isEmpty) doneSignal else { | |
val uuid = randomUUID() | |
def awaitResult: Receive = { | |
case PersistResponse(`uuid`) ⇒ | |
doneSignal | |
context.unbecome() | |
unstashAll() | |
case _ ⇒ stash() | |
} | |
store ! PersistRequest(uuid, events) | |
context.become(awaitResult) | |
} | |
} | |
// Sanity Checks | |
def validM(m: Msg) = m.beamId == beamId | |
def validE(e: Event) : Boolean = validM(e) && evtHandler.isDefinedAt(e) | |
def validC(c: Command) : Boolean = validM(c) && cmdHandler.isDefinedAt(c) | |
// Side-effects | |
private def restore(snapshot: S) : Unit = | |
state = snapshot | |
private def apply(es: Event*) : Unit = | |
state = es.foldLeft(state)((s,e) ⇒ evtHandler(e)(s)) | |
private var state: S = stateBuilder | |
} | |
trait Beamer extends EventsourcedProcessor | |
with ActorContextCreationSupport { | |
def pid: String | |
override val processorId = pid | |
override def preStart() = { | |
self ! Recover(replayMax = 0L) | |
} | |
def receiveCommand = { | |
case cmd: Command ⇒ dispatch(cmd.beamId) forward cmd | |
case pr: PersistRequest ⇒ persist(pr.events) { _ ⇒ | |
sender() ! PersistResponse(pr.uuid) | |
} | |
} | |
def receiveRecover = { | |
case _ ⇒ // no-op | |
} | |
private def dispatch(beamId: String) = getOrCreateChild( | |
props = beamProps(beamId, self), | |
name = beamId | |
) | |
def beamProps(beamId: String, store: ActorRef): Props | |
} | |
// Alternatively, just a Sink. | |
// Beamer would then start, watch etc. sink and hand to children. | |
trait BeamSink extends EventsourcedProcessor { | |
def sinkId: String | |
final override val processorId = sinkId | |
final override def preStart() = | |
self ! Recover(replayMax = 0L) | |
final def receiveCommand = { | |
case pr: PersistRequest ⇒ persist(pr.events) { _ ⇒ | |
sender() ! PersistResponse(pr.uuid) | |
} | |
} | |
final def receiveRecover = Actor.emptyBehavior | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment