Skip to content

Instantly share code, notes, and snippets.

@fhuertas
Last active February 22, 2019 15:41
Show Gist options
  • Save fhuertas/d171ba87548dc00cde851d5b11c1fd16 to your computer and use it in GitHub Desktop.
Save fhuertas/d171ba87548dc00cde851d5b11c1fd16 to your computer and use it in GitHub Desktop.
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.cluster.singleton.{
ClusterSingletonManager,
ClusterSingletonManagerSettings,
ClusterSingletonProxy,
ClusterSingletonProxySettings
}
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import com.typesafe.scalalogging.LazyLogging
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.{Failure, Try}
object Runner {
def main(file: String): Unit = {
val port = ConfigFactory.load(file).getInt("cluster.port")
val config = ConfigFactory
.load(file)
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(firstOpenPort(port)))
ConfigFactory.load()
implicit val system: ActorSystem = ActorSystem(config.getString("cluster.name"), config)
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[Consumer]),
terminationMessage = Consumer.End,
settings = ClusterSingletonManagerSettings(system).withRole("component")),
name = "consumer"
)
val proxy = system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/consumer",
settings = ClusterSingletonProxySettings(system).withRole("component")),
name = "consumerProxy"
)
val producer = system.actorOf(Props(classOf[Producer], proxy))
import Consumer._
producer ! Start
}
val SocketTimeout = 200
@tailrec
def firstOpenPort(port: Int): Int = {
def tryPort(port: Int): Int = {
val socket = new java.net.Socket()
socket.connect(new java.net.InetSocketAddress("localhost", port), SocketTimeout)
socket.close()
port
}
Try(tryPort(port)) match {
case Failure(_) => port
case _ => firstOpenPort(port + 1)
}
}
}
object RunnerMain1 extends App with LazyLogging {
Runner.main("w1.conf")
}
object RunnerMain2 extends App with LazyLogging {
Runner.main("w1.conf")
}
import com.spotahome.underworld.agent.Consumer._
class Producer(consumer: ActorRef) extends Actor {
override def receive: Receive = {
case Start =>
println("Start")
consumer ! Ping
case Pong =>
val s = sender()
import context.dispatcher
println("Pong")
context.system.scheduler.scheduleOnce(1 second)({ s ! Ping })
case m =>
println(m)
}
}
class Consumer extends Actor {
println("Consumer started")
override def receive: Receive = working
def stopping: Receive = {
case _ =>
}
def working: Receive = {
case End ⇒
println("end")
context.become(stopping)
case Ping ⇒
println("Ping")
val s = sender()
import context.dispatcher
context.system.scheduler.scheduleOnce(1 second)({ s ! Pong })
case m => println(m)
}
}
object Consumer {
case object Start
case object End
case object GetCurrent
case object Ping
case object Pong
}
cluster.name: "cluster"
cluster.port: 2552
akka {
log-dead-letters = 10
log-dead-letters-during-shutdown = on
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
hostname = localhost
domain = ""
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = ${akka.hostname}${akka.domain}
port = ${cluster.port}
}
}
actor {
provider = "akka.cluster.ClusterActorRefProvider"
warn-about-java-serializer-usage = false
debug {
autoreceive = on
lifecycle = on
unhandled = on
}
}
cluster {
roles = [component]
seed-nodes = ["akka.tcp://"${cluster.name}"@localhost:"${cluster.port}]
role {
component.min-nr-of-members = 1
}
auto-down-unreachable-after = 60s
shutdown-after-unsuccessful-join-seed-nodes = 30s
}
coordinated-shutdown.exit-jvm = on
}
akka.management.http {
route-providers += "akka.management.HealthCheckRoutes"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment