Skip to content

Instantly share code, notes, and snippets.

@strobe
Last active August 29, 2015 14:09
Show Gist options
  • Save strobe/aa7f52eaf5b64d9cab73 to your computer and use it in GitHub Desktop.
Save strobe/aa7f52eaf5b64d9cab73 to your computer and use it in GitHub Desktop.
Akka Clustering issue with per node communication
akka {
loglevel = INFO
log-dead-letters = off
log-dead-letters-during-shutdown = off
# /// Clusterring. /// #
# For clustering the following configuration needs to be enabled
actor {
provider = "akka.cluster.ClusterActorRefProvider"
# //#config-router-lookup
deployment {
/spray-service/routerActor/workerRouter {
router = round-robin-group
nr-of-instances = 10
routees.paths = ["/user/backend"]
cluster {
enabled = on
allow-local-routees = on
}
}
}
# //#config-router-lookup
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
# list of seed nodes which may be started on same JVM (just few Akka Systems)
# and on different JVMs (as well on different nodes)
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
# this setting is used for case when nodes is fail,
# after that timeout node will be removed from cluster
auto-down-unreachable-after = 10s
}
# remote.netty.tcp.maximum-frame-size = 384000b
# /// end /// #
}
object BackendApp {
def main(args: Array[String]): Unit = {
// Override the configuration of the port when specified as program argument
val port = if (args.isEmpty) "0" else args(0)
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[BackendWorkerActor], name = "backend")
}
}
class BackendsRouterActor extends Actor with ActorLogging {
// router which defined in config
val workersRouter = context.actorOf(FromConfig.props(Props.empty), name = "workerRouter")
implicit def executionContext: ExecutionContextExecutor = context.dispatcher
def receive = {
case ToRouter(message: BackendMessage) => {
workersRouter ! ToBackEnd(message, sender())
}
case BackToRouter(message: FrontendMessage, origin: ActorRef) => {
log.info("BackToRouter recevied to BackendsRouterActor ")
origin ! BackToOrigin(message)
}
}
}
class BackendWorkerActor extends Actor with ActorLogging {
val cluster = Cluster(context.system)
implicit def executionContext: ExecutionContextExecutor = context.dispatcher
def receive = {
case ToBackEnd(message: BackendMessage, origin: ActorRef) => {
message match {
case TestAsk(ctx: RequestContext) => {
log.info("ToBackEndTestAskResponse recevied to BackendWorkerActor ")
sender ! BackToRouter(TestAskResponse("job result"), origin)
}
case SimpleStringStream(ctx: RequestContext) => {
val stream: Stream[String] = new Streamer(context).simpleStringStream
sender ! BackToRouter(SimpleStringStreamResponse(stream), origin)
}
}
}
}
}
object ClusterApp {
def main(args: Array[String]): Unit = {
FrontendSprayApp.main(Seq("2551").toArray)
BackendApp.main(Seq("2552").toArray)
BackendApp.main(Array.empty)
BackendApp.main(Array.empty)
}
}
object FrontendSprayApp {
def main(args: Array[String]): Unit = {
// Override the configuration of the port when specified as program argument
val port = if (args.isEmpty) "0" else args(0)
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
withFallback(ConfigFactory.load())
implicit val system = ActorSystem("ClusterSystem", config)
// server actor initialization
val service = system.actorOf(Props[FrontendServerActor], "spray-service")
// http initialization
IO(Http) ! Http.Bind(service, interface = "localhost", port = 8050)
}
}
class FrontendServerActor extends Actor with ChunksTestService with ActorLogging {
def actorRefFactory = context
def receive = {
runRoute(apiRoute)
}
}
trait ChunksTestService extends HttpService {
// These implicit values allow us to use futures
// in this trait.
implicit def executionContext = actorRefFactory.dispatcher
implicit val timeout = Timeout(5.seconds)
// Our worker Actor handles the work of the request.
val router = actorRefFactory.actorOf(Props[BackendsRouterActor], "routerActor")
val apiRoute =
path("ping") {
get {
complete("PONG")
}
} ~
path("html_stream") {
get {
respondWithMediaType(`text/html`) {
ctx => doSimpleStringStream(ctx)
}
}
}
def doTestAskResponse(ctx: RequestContext) = {
actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
// To set an initial possible delay
context.setReceiveTimeout(600 millis)
override def preStart = {
router ! ToRouter(TestAsk(ctx))
}
def receive = {
case BackToOrigin(message: FrontendMessage) =>
message match {
case TestAskResponse(text: String) =>
ctx.responder ! HttpResponse(StatusCodes.OK, text)
}
case ReceiveTimeout =>
// To turn it off
ctx.responder ! HttpResponse(StatusCodes.NetworkConnectTimeout,
"backend worker doesn't produced result in expected time!")
}
}
}
}
}
def doSimpleStringStream(ctx: RequestContext) = {
actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
override def preStart = {
router ! ToRouter(SimpleStringStream(ctx))
}
def receive = {
case BackToOrigin(message: FrontendMessage) =>
message match {
case SimpleStringStreamResponse(stream: Stream[String]) =>
ctx.responder ! complete(stream)
}
case ReceiveTimeout =>
ctx.responder ! HttpResponse(StatusCodes.NetworkConnectTimeout,
"backend worker doesn't produced result in expected time!")
}
}
}
}
}
}
// possible messages types
object BackendMessages {
//
case class ToRouter(message: BackendMessage)
case class ToBackEnd(message: BackendMessage, originRef: ActorRef)
//
case class BackToRouter(message: FrontendMessage, originRef: ActorRef)
case class BackToOrigin(message: FrontendMessage)
//
}
// messages which may send to backend
abstract class BackendMessage
case class TestAsk(ctx: RequestContext) extends BackendMessage
case class SimpleStringStream(ctx: RequestContext) extends BackendMessage
// messages which may send to frontend
abstract class FrontendMessage
case class TestAskResponse(text: String) extends FrontendMessage
case class SimpleStringStreamResponse(stream: Stream[String]) extends FrontendMessage
class Streamer(actorRefFactory: ActorContext) (implicit ec: ExecutionContext) {
lazy val streamStart = " " * 2048 + "<html><body><h2>A streaming response</h2><p>(for 15 seconds)<ul>"
lazy val streamEnd = "</ul><p>Finished.</p></body></html>"
def simpleStringStream: Stream[String] = {
val secondStream = Stream.continually {
// CAUTION: we block here to delay the stream generation for you to
// be able to follow it in your browser,
// this is only done for the purpose of this demo, blocking in actor
// code should otherwise be avoided
// Thread.sleep(500)
"<li>" + DateTime.now.toIsoDateTimeString + "</li>"
}
streamStart #:: secondStream.take(15) #::: streamEnd #:: Stream.empty
}
}
@strobe
Copy link
Author

strobe commented Nov 13, 2014

GET to "/ask_test" returns "job result"
&
GET to "/html_stream" produced:

java.io.NotSerializableException: Streamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:722)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment