Skip to content

Instantly share code, notes, and snippets.

@ivanopagano
Forked from IainHull/OrderExampleReliable.scala
Last active August 29, 2015 14:07
Show Gist options
  • Save ivanopagano/febfa1f30b88d75d4351 to your computer and use it in GitHub Desktop.
Save ivanopagano/febfa1f30b88d75d4351 to your computer and use it in GitHub Desktop.
name := "external-service-integration-akka-persistence"
version := "1.0"
scalaVersion := "2.11.1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.6",
"com.typesafe.akka" %% "akka-remote" % "2.3.6"
)
common {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
}
loglevel = INFO
}
}
processor {
akka {
remote {
netty.tcp {
port = 2553
}
}
}
}
validator {
akka {
remote {
netty.tcp {
port = 2552
}
}
}
}
package org.iainhull.akka
import scala.concurrent.duration._
import akka.actor._
import akka.event.Logging
import akka.pattern.ask
import akka.util.Timeout
import akka.persistence.{PersistentView, AtLeastOnceDelivery, PersistentActor}
import akka.persistence.AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning}
import com.typesafe.config.ConfigFactory
// ------------------------------------
// Domain object
// ------------------------------------
case class Order(id: Int = -1, details: String,
creditCardNumber: String,
creditCardValidation: Validation = Validation.Pending)
sealed trait Validation
object Validation {
case object Pending extends Validation
case object Success extends Validation
case object Failure extends Validation
}
// ------------------------------------
// Domain events
// ------------------------------------
sealed trait Command
case class OrderSubmitted(order: Order) extends Command
sealed trait Event
case class OrderStored(order: Order) extends Event
case class OrderAccepted(order: Order, deliveryId: Long) extends Event
case class OrderRejected(order: Order, deliveryId: Long) extends Event
case class CreditCardValidationRequested(deliveryId: Long, orderId: Int, creditCardNumber: String)
case class CreditCardValidated(deliveryId: Long, orderId: Int)
case class CreditCardValidationFailed(deliveryId: Long, orderId: Int)
// ------------------------------------
// Application commands/events
// ------------------------------------
case class SetCreditCardValidator(validator: ActorPath)
case class SetValidOrderDestination(destination: ActorRef)
case class SetInvalidOrderDestination(destination: ActorRef)
case class Recover(timeout: Timeout)
case object Recovered
// ------------------------------------
// Eventsourced order processor
// ------------------------------------
class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {
import Validation._
var validator: ActorPath = context.system.deadLetters.path
var orders = Map.empty[Int, Order] // state
override def persistenceId = "order-processor"
def updateState(event: Event): Unit = event match {
case OrderStored(order) =>
orders = orders + (order.id -> order)
deliver(validator, deliveryId => CreditCardValidationRequested(deliveryId, order.id, order.creditCardNumber))
case OrderAccepted(order, deliveryId) =>
orders = orders + (order.id -> order)
confirmDelivery(deliveryId)
case OrderRejected(order, deliveryId) =>
orders = orders + (order.id -> order)
confirmDelivery(deliveryId)
}
val receiveCommand: Receive = {
case OrderSubmitted(order) =>
val id = orders.size
persist(OrderStored(order.copy(id = id))) { event =>
updateState(event)
sender ! event
}
case CreditCardValidated(deliveryId, orderId) =>
orders.get(orderId) foreach { order =>
if (order.creditCardValidation == Pending) {
persist(OrderAccepted(order.copy(creditCardValidation = Success), deliveryId))(updateState)
}
}
case CreditCardValidationFailed(deliveryId, orderId) =>
orders.get(orderId) foreach { order =>
if (order.creditCardValidation == Pending) {
persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState)
}
}
case UnconfirmedWarning(unconfirmedDeliveries) =>
for {
UnconfirmedDelivery(deliveryId, _, CreditCardValidationRequested(_, orderId, _)) <- unconfirmedDeliveries
order <- orders.get(orderId)
} {
persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState)
}
case SetCreditCardValidator(v) =>
validator = v
}
val receiveRecover: Receive = {
case event: Event => updateState(event)
}
}
object OrderProcessor extends App {
val config = ConfigFactory.load("order")
val configCommon = config.getConfig("common")
println(config.getConfig("processor").withFallback(configCommon))
implicit val system = ActorSystem("example", config.getConfig("processor").withFallback(configCommon))
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val log = Logging(system, this.getClass)
val destination = system.actorOf(Props[OrderDestination], "destination")
val processor = system.actorOf(Props[OrderProcessor], "processor")
val validator = ActorPath.fromString("akka.tcp://example@127.0.0.1:2552/user/validator")
processor ! SetCreditCardValidator(validator)
val f1 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678"))
val f2 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-0000"))
for (r1 <- f1; r2 <- f2) {
log.info("Reply 1: {}", r1)
log.info("Reply 2: {}", r2)
}
}
// ------------------------------------
// Local receiver of orders after
// credit card validation
// ------------------------------------
class OrderDestination extends PersistentView with ActorLogging {
override def persistenceId = "order-processor"
override def viewId = "order-processor-view"
def receive = {
case OrderAccepted(order, _) => log.info("Received accepted order: {}", order)
case OrderRejected(order, _) => log.info("Received rejected order: {}", order)
}
}
// ------------------------------------
// Remote credit card validator
// ------------------------------------
class CreditCardValidator extends Actor {
def receive = {
case CreditCardValidationRequested(deliveryId, orderId, creditCardNumber) =>
if (creditCardNumber.contains("0000")) {
sender ! CreditCardValidationFailed(deliveryId, orderId)
} else {
sender ! CreditCardValidated(deliveryId, orderId)
}
}
}
object CreditCardValidator extends App {
val config = ConfigFactory.load("order")
val configCommon = config.getConfig("common")
val system = ActorSystem("example", config.getConfig("validator").withFallback(configCommon))
system.actorOf(Props[CreditCardValidator], "validator")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment