Skip to content

Instantly share code, notes, and snippets.

@krasserm
Last active December 12, 2015 00:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krasserm/4682655 to your computer and use it in GitHub Desktop.
Save krasserm/4682655 to your computer and use it in GitHub Desktop.
Event sourcing and external servce integration (blog post snippets)
case class Order(id: Int = -1, details: String,
creditCardNumber: String,
creditCardValidation: Validation = Validation.Pending)
sealed trait Validation
object Validation {
case object Pending extends Validation
case object Success extends Validation
case object Failure extends Validation
}
case class OrderSubmitted(order: Order)
case class CreditCardValidationRequested(orderId: Int, creditCardNumber: String)
case class CreditCardValidated(orderId: Int)
case class CreditCardValidationFailed(orderId: Int)
new OrderProcessor(id) with Receiver with Eventsourced
class OrderProcessor(val id: Int) extends Actor { this: Receiver =>
var validationRequestChannel: ActorRef = …
var orders = Map.empty[Int, Order] // state
def receive = {
case OrderSubmitted(order) => {
val id = orders.size
val upd = order.copy(id = id)
orders = orders + (id -> upd)
sender ! OrderStored(upd)
validationRequestChannel ! message.copy(CreditCardValidationRequested(id, order.creditCardNumber))
}
}
}
case class SetCreditCardValidator(validator: ActorRef)
class OrderProcessor(val id: Int) extends Actor { this: Receiver =>
val ext = EventsourcingExtension(context.system)
var validationRequestChannel: ActorRef = …
var orders = Map.empty[Int, Order]
def receive = {
case SetCreditCardValidator(validator) => {
validationRequestChannel = ext.channelOf(ReliableRequestReplyChannelProps(…, validator)
.withRedeliveryMax(10)
.withRedeliveryDelay(3 seconds)
.withReplyTimeout(5 second))
}
}
}
val extension: EventsourcingExtension = …
val processor: ActorRef = extension.processorOf(ProcessorProps(1, id => new OrderProcessor(id) with Receiver with Eventsourced))
val validator: ActorRef = … // remote lookup
processor ! SetCreditCardValidator(validator)
class OrderProcessor(val id: Int) extends Actor with ActorLogging { this: Receiver =>
import Validation._
var orders = Map.empty[Int, Order]
def receive = {
case CreditCardValidated(orderId) => {
onValidationSuccess(orderId)
confirm(true)
}
case CreditCardValidationFailed(orderId) => {
onValidationFailure(orderId)
confirm(true)
}
case DestinationNotResponding(channelId, failureCount, request) => {
log.warning("Destination of channel {} does not respond (failure count = {}). Negatively confirm message receipt.", channelId, failureCount)
confirm(false) // retry (or escalate)
}
case DestinationFailure(channelId, failureCount, CreditCardValidationRequested(orderId, _), throwable) => {
if (failureCount > 2) {
onValidationFailure(orderId)
confirm(true)
} else {
log.warning("Destination of channel {} returned a failure (failure count = {}). Negatively confirm message receipt.", channelId, failureCount)
confirm(false) // retry
}
}
}
def onValidationSuccess(orderId: Int) {
orders.get(orderId).filter(_.creditCardValidation == Pending).foreach { order =>
val upd = order.copy(creditCardValidation = Success)
orders = orders + (orderId -> upd)
// notify others about accepted order
// …
}
}
def onValidationFailure(orderId: Int) {
orders.get(orderId).filter(_.creditCardValidation == Pending).foreach { order =>
val upd = order.copy(creditCardValidation = Failure)
orders = orders + (orderId -> upd)
// notify others about rejected order
// …
}
}
}
class OrderProcessor(val id: Int) extends Actor with ActorLogging { this: Receiver =>
def receive = {
case DeliveryStopped(channelId) => {
val delay = FiniteDuration(5, "seconds")
log.warning("Channel {} stopped delivery. Reactivation in {}.", channelId, delay)
context.system.scheduler.scheduleOnce(delay, validationRequestChannel, Deliver)(context.dispatcher)
}
}
override def preStart() {
context.system.eventStream.subscribe(self, classOf[DeliveryStopped])
}
override def postStop() {
context.system.eventStream.unsubscribe(self, classOf[DeliveryStopped])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment