Skip to content

Instantly share code, notes, and snippets.

@douglaz
Created March 8, 2017 17:03
Show Gist options
  • Save douglaz/e06c1b6dbe95a14407866165af50ad1e to your computer and use it in GitHub Desktop.
Save douglaz/e06c1b6dbe95a14407866165af50ad1e to your computer and use it in GitHub Desktop.
Partial example of an Akka FSM with well defined messages and states
object PeriodDirectorActor {
// Events received from other actors or generated internally
object Events {
sealed abstract trait EventType
// Public events
case class Start(conf: FullConfiguration) extends EventType
case object Stop extends EventType
// Called by StreamReaders
case class ReaderWorkAvailable(work: CartView) extends EventType
case class ReaderWork(event: CartView) extends EventType
case object ReaderWorkUnavailable extends EventType // the requested work or the one previously advertised as available is unavailable now
case object ReaderStopped extends EventType
// Called by Workers
case object WorkerIsReadyForWork extends EventType
case object WorkRequiresCommit extends EventType
case object WorkFinished extends EventType
// Internal Events
case class ConsumerCreated(consumer: ConsumerConnectorApi, streams: List[KafkaByteIteratorApi]) extends EventType
case class ConsumerFailed(t: Throwable) extends EventType
case object CommitGraceTimeout extends EventType
case object RunningTimeout extends EventType
case object CommitSuccess extends EventType
case class CommitFailure(t: Throwable) extends EventType
}
// Possible states
object State {
sealed abstract trait StateType
case object Idle extends StateType
case object Starting extends StateType
case object Running extends StateType
case object WaitingCommit extends StateType
case object Stopping extends StateType
}
// Our internal data representation
object Data {
sealed abstract trait DataType
case object Uninitialized extends DataType
case class Initializing(conf: FullConfiguration) extends DataType
object ReaderState {
sealed abstract trait StateType
case object WithoutWork extends StateType
case class WithWork(cart: CartView) extends StateType
case object ReadingWork extends StateType
case object Stopping extends StateType
}
object WorkerState {
sealed abstract trait StateType
case object Idle extends StateType
case object ReadyToWork extends StateType
case class WithWork(cart: CartView) extends StateType
case class WaitingCommit(cart: CartView) extends StateType
}
type ReadersStateMap = Map[ActorRef, ReaderState.StateType]
type WorkersStateMap = Map[ActorRef, WorkerState.StateType]
case class Initialized(consumer: ConsumerConnectorApi,
committed: Boolean,
conf: FullConfiguration,
readers: ReadersStateMap,
workers: WorkersStateMap) extends DataType
}
def props(period: Period, programManager: ActorRef) =
Props(new PeriodDirectorActor(period, programManager))
}
class PeriodDirectorActor(period: Period, programManager: ActorRef) extends LoggableFSM[State.StateType, Data.DataType]
with ActorLogging
with BlockingIOExecutionContext
with LogUnhandledEvents[State.StateType, Data.DataType] {
startWith(State.Idle, Data.Uninitialized)
when(State.Idle) {
case Event(Events.WorkerIsReadyForWork, _) =>
log.debug(s"Received WorkerIsReadyForWork probably sent from a worker before dying, ignoring...")
lstay
case Event(Events.Start(conf), Data.Uninitialized) =>
Telemetry.directorTryingStart(period)
log.info(s"Starting for period $period")
asyncCreateConsumer()
goto(State.Starting) using Data.Initializing(conf)
}
// This is just a snippet. The actual file is 500+ lines of code
}
@douglaz
Copy link
Author

douglaz commented Mar 8, 2017

This is a coordinator that read some data from Kafka readers and distribute it to workers. It also implements a sort of 2-phase commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment