Skip to content

Instantly share code, notes, and snippets.

@codepr
Last active May 5, 2021 21:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codepr/779f5a6fd03c21446cfc to your computer and use it in GitHub Desktop.
Save codepr/779f5a6fd03c21446cfc to your computer and use it in GitHub Desktop.
Basic hello world cluster server
package clusterserver
import akka.actor._
class Logger extends Actor with ActorLogging {
log.info("Logger started!")
def receive = {
case msg => log.info("Got msg: {}", msg)
}
}
name := "cluster-server"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.2",
"com.typesafe.akka" %% "akka-cluster" % "2.4.2"
)
package clusterserver
import akka.actor._
import akka.routing._
import akka.cluster._
import akka.cluster.routing._
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
/**
* Server actor listening for connections on a random free port.
* This must be done in order to run a simulation cluster on a single node
* cause we need to bind our server socket to a different port
* for every instance. In a real distributed system we can select a single
* distinct port and bound every instance to it.
*
* @author: Andrea Giacomo Baldan
* @version: 1.0
* @date: 2016/03/26
*/
class Server(ref: ActorRef) extends Actor with ActorLogging {
import Tcp._
import context.system
IO(Tcp) ! Bind(self, new InetSocketAddress("127.0.0.1", 0)) // port set to 0 means a random non-used port after the 1000th
def receive = {
case b @ Bound(localAddress) =>
println("[*] " + localAddress.getHostString + " listening on port " + localAddress.getPort)
case CommandFailed(_: Bind) => context stop self
case c @ Connected(remote, local) =>
println("[*] Connection received from " + remote.getHostString)
val connection = sender
val handler = context.actorOf(Props(new LoggerHandler(remote, ref)))
connection ! Register(handler)
connection ! Write(ByteString("\n\n[*] Hello from " + local.getHostString + ":" + local.getPort + "\n\n"))
}
}
/**
* Handler class for every incoming client, it's responsible for the entire life-cycle
* of the connection, handle all commands from the user.
* Handle "single", "multiple" and "quit" commands, just for simple cluster testing purpose
*/
class LoggerHandler(reference: InetSocketAddress, ref: ActorRef) extends Actor with ActorLogging {
import Tcp._
val SINGLE = ByteString("single\r\n")
val MULTIPLE = ByteString("multiple\r\n")
val QUIT = ByteString("quit\r\n")
def receive = {
case Received(data) =>
data match {
case SINGLE => ref ! 1
case MULTIPLE => (1 to 10).foreach(i => ref ! i)
case QUIT => self ! PeerClosed
case _ => sender ! Write(ByteString("hello"))
}
case PeerClosed =>
context stop self
}
}
/**
* Set one seed-node at localhost:2500, just for simulation purpose.
* Just play with router settings and round robin settings to change
* behaviour of the router and routees.
*
* Run instructions:
* In order to test the effectively load balance between the two simulated node in a local single node
* we need to start the first instance of the server binding to the tcp port 2500 in order to simulate
* the seed node.
* NODE A(e.g. shell A): sbt run -Dakka.remote.netty.tcp.port=2500
* NODE B(e.g. shell B): sbt run -Dakka.remote.netty.tcp.port=0
*/
object TestSystem extends App {
val config = ConfigFactory.parseString(s"""
akka.actor.provider=akka.cluster.ClusterActorRefProvider
akka.cluster.seed-nodes = ["akka.tcp://DumbSystem@127.0.0.1:2500"]
""")
val system = ActorSystem("DumbSystem", config)
Cluster(system).registerOnMemberUp {
val roundRobinPool = RoundRobinPool(nrOfInstances = 10)
val clusterRoutingSettings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 5, allowLocalRoutees = true, useRole = None)
val clusterPool = ClusterRouterPool(roundRobinPool, clusterRoutingSettings)
val router = system.actorOf(clusterPool.props(Props[Logger]))
system.actorOf(Props(new Server(router)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment