Last active
February 22, 2019 15:41
-
-
Save fhuertas/d171ba87548dc00cde851d5b11c1fd16 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorRef, ActorSystem, Props} | |
import akka.cluster.singleton.{ | |
ClusterSingletonManager, | |
ClusterSingletonManagerSettings, | |
ClusterSingletonProxy, | |
ClusterSingletonProxySettings | |
} | |
import com.typesafe.config.{ConfigFactory, ConfigValueFactory} | |
import com.typesafe.scalalogging.LazyLogging | |
import scala.annotation.tailrec | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Try} | |
object Runner { | |
def main(file: String): Unit = { | |
val port = ConfigFactory.load(file).getInt("cluster.port") | |
val config = ConfigFactory | |
.load(file) | |
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(firstOpenPort(port))) | |
ConfigFactory.load() | |
implicit val system: ActorSystem = ActorSystem(config.getString("cluster.name"), config) | |
system.actorOf( | |
ClusterSingletonManager.props( | |
singletonProps = Props(classOf[Consumer]), | |
terminationMessage = Consumer.End, | |
settings = ClusterSingletonManagerSettings(system).withRole("component")), | |
name = "consumer" | |
) | |
val proxy = system.actorOf( | |
ClusterSingletonProxy.props( | |
singletonManagerPath = "/user/consumer", | |
settings = ClusterSingletonProxySettings(system).withRole("component")), | |
name = "consumerProxy" | |
) | |
val producer = system.actorOf(Props(classOf[Producer], proxy)) | |
import Consumer._ | |
producer ! Start | |
} | |
val SocketTimeout = 200 | |
@tailrec | |
def firstOpenPort(port: Int): Int = { | |
def tryPort(port: Int): Int = { | |
val socket = new java.net.Socket() | |
socket.connect(new java.net.InetSocketAddress("localhost", port), SocketTimeout) | |
socket.close() | |
port | |
} | |
Try(tryPort(port)) match { | |
case Failure(_) => port | |
case _ => firstOpenPort(port + 1) | |
} | |
} | |
} | |
object RunnerMain1 extends App with LazyLogging { | |
Runner.main("w1.conf") | |
} | |
object RunnerMain2 extends App with LazyLogging { | |
Runner.main("w1.conf") | |
} | |
import com.spotahome.underworld.agent.Consumer._ | |
class Producer(consumer: ActorRef) extends Actor { | |
override def receive: Receive = { | |
case Start => | |
println("Start") | |
consumer ! Ping | |
case Pong => | |
val s = sender() | |
import context.dispatcher | |
println("Pong") | |
context.system.scheduler.scheduleOnce(1 second)({ s ! Ping }) | |
case m => | |
println(m) | |
} | |
} | |
class Consumer extends Actor { | |
println("Consumer started") | |
override def receive: Receive = working | |
def stopping: Receive = { | |
case _ => | |
} | |
def working: Receive = { | |
case End ⇒ | |
println("end") | |
context.become(stopping) | |
case Ping ⇒ | |
println("Ping") | |
val s = sender() | |
import context.dispatcher | |
context.system.scheduler.scheduleOnce(1 second)({ s ! Pong }) | |
case m => println(m) | |
} | |
} | |
object Consumer { | |
case object Start | |
case object End | |
case object GetCurrent | |
case object Ping | |
case object Pong | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cluster.name: "cluster" | |
cluster.port: 2552 | |
akka { | |
log-dead-letters = 10 | |
log-dead-letters-during-shutdown = on | |
loglevel = "INFO" | |
loggers = ["akka.event.slf4j.Slf4jLogger"] | |
extensions = ["akka.cluster.client.ClusterClientReceptionist"] | |
hostname = localhost | |
domain = "" | |
remote { | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = ${akka.hostname}${akka.domain} | |
port = ${cluster.port} | |
} | |
} | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
warn-about-java-serializer-usage = false | |
debug { | |
autoreceive = on | |
lifecycle = on | |
unhandled = on | |
} | |
} | |
cluster { | |
roles = [component] | |
seed-nodes = ["akka.tcp://"${cluster.name}"@localhost:"${cluster.port}] | |
role { | |
component.min-nr-of-members = 1 | |
} | |
auto-down-unreachable-after = 60s | |
shutdown-after-unsuccessful-join-seed-nodes = 30s | |
} | |
coordinated-shutdown.exit-jvm = on | |
} | |
akka.management.http { | |
route-providers += "akka.management.HealthCheckRoutes" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment