Skip to content

Instantly share code, notes, and snippets.

@luben
Last active April 24, 2018 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luben/b78043f54708e746c5e32cd466b36af0 to your computer and use it in GitHub Desktop.
Save luben/b78043f54708e746c5e32cd466b36af0 to your computer and use it in GitHub Desktop.
Akka-Artery-TCP-leak.scala
import com.typesafe.config._
import akka.actor._
import scala.util.Random
sealed trait Msg;
case object Ping extends Msg;
case object Pong extends Msg;
case object Leak extends Msg;
class ServerActor extends Actor {
import context._
def receive = {
case Leak =>
val worker = system.actorOf(Props[WorkerActor])
worker.tell(Ping, sender)
case Ping =>
sender ! Pong
}
}
class WorkerActor extends Actor {
import context._
var arr: Array[Byte] = _
def receive = {
case Ping =>
arr = Array.fill(1024*1024)(0)
sender ! Pong
stop(self)
}
}
class ClientActor(msg: Msg) extends Actor with ActorLogging {
import context._
val identifyId = new Random().nextLong()
override def preStart: Unit = {
actorSelection("akka://Server@127.0.1.1:2552/user/server") ! Identify(identifyId)
}
def receive = {
case ActorIdentity(`identifyId`, None) =>
log.warning("Server not discovered")
case ActorIdentity(`identifyId`, Some(actorRef)) =>
actorRef ! msg
case Pong =>
stop(self)
system.terminate
}
}
object Cfg {
def config =
ConfigFactory.parseString("""
| akka {
| loglevel = "DEBUG"
| actor.provider = remote
| remote.artery {
| enabled = on
| transport = tcp
| canonical.port = 0
| advanced.compression {
| actor-refs.max = 256
| manifests.max = 256
| }
| }
| }
|""".stripMargin).withFallback(ConfigFactory.load())
def leakyServerConfig =
ConfigFactory
.parseString("akka.remote.artery.canonical.port = 2552")
.withFallback(config)
def nonLeakyServerConfig =
ConfigFactory
.parseString("""
| akka.remote.artery {
| canonical.port = 2552
| advanced {
| compression {
| actor-refs.max = 0
| manifests.max = 0
| }
| }
| }
""".stripMargin).withFallback(config)
}
trait Server {
def cfg: Config
def main(args: Array[String]): Unit = {
val system = ActorSystem("Server", cfg)
val server = system.actorOf(Props[ServerActor], name = "server")
}
}
object LeakyServer extends Server {
val cfg = Cfg.leakyServerConfig
}
object NonLeakyServer extends Server {
val cfg = Cfg.nonLeakyServerConfig
}
trait Client {
import Cfg._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
val msg: Msg
def main(args: Array[String]): Unit = {
while (true) {
val system = ActorSystem("Client", config)
val client = system.actorOf(Props(new ClientActor(msg)), name = "client")
Await.result(system.whenTerminated, Duration.Inf)
}
}
}
object PingClient extends Client {
val msg = Ping
}
object LeakClient extends Client {
val msg = Leak
}
name := "Test"
version := "0.1"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-remote" % "2.5.12",
)
...
[04/24/2018 12:36:01.220] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Restarting graph in 1059358224 nanoseconds
[INFO] [04/24/2018 12:36:01.220] [Server-akka.actor.default-dispatcher-935] [akka.remote.artery.Association(akka://Server)] Stopping idle outbound control stream to [akka://Client@127.0.1.1:39711]
[WARN] [04/24/2018 12:36:01.220] [Server-akka.remote.default-remote-dispatcher-4] [akka.stream.Log(akka://Server/system/StreamSupervisor-0)] [outbound connection to [akka://Client@127.0.1.1:39711], control stream] Upstream failed, cause: Association$OutboundStreamStopIdleSignal$:
[DEBUG] [04/24/2018 12:36:01.220] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Last restart attempt was more than 1000 milliseconds ago, resetting restart count
[DEBUG] [04/24/2018 12:36:01.220] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Graph in finished
[DEBUG] [04/24/2018 12:36:01.220] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Restarting graph in 1045707894 nanoseconds
[INFO] [04/24/2018 12:36:01.389] [Server-akka.actor.default-dispatcher-936] [akka.remote.artery.Association(akka://Server)] Stopping idle outbound control stream to [akka://Client@127.0.1.1:45051]
[WARN] [04/24/2018 12:36:01.389] [Server-akka.remote.default-remote-dispatcher-5] [akka.stream.Log(akka://Server/system/StreamSupervisor-0)] [outbound connection to [akka://Client@127.0.1.1:45051], control stream] Upstream failed, cause: Association$OutboundStreamStopIdleSignal$:
[DEBUG] [04/24/2018 12:36:01.390] [Server-akka.remote.default-remote-dispatcher-5] [RestartWithBackoffFlow(akka://Server)] Last restart attempt was more than 1000 milliseconds ago, resetting restart count
[DEBUG] [04/24/2018 12:36:01.390] [Server-akka.remote.default-remote-dispatcher-5] [RestartWithBackoffFlow(akka://Server)] Graph in finished
[DEBUG] [04/24/2018 12:30:59.310] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Restarting graph in 1038505711 nanoseconds
[INFO] [04/24/2018 12:30:59.379] [Server-akka.actor.default-dispatcher-926] [akka.remote.artery.Association(akka://Server)] Stopping idle outbound control stream to [akka://Client@127.0.1.1:41567]
[WARN] [04/24/2018 12:30:59.379] [Server-akka.remote.default-remote-dispatcher-4] [akka.stream.Log(akka://Server/system/StreamSupervisor-0)] [outbound connection to [akka://Client@127.0.1.1:41567], control stream] Upstream failed, cause: Association$OutboundStreamStopIdleSignal$:
[DEBUG] [04/24/2018 12:30:59.379] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Last restart attempt was more than 1000 milliseconds ago, resetting restart count
[DEBUG] [04/24/2018 12:30:59.379] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Graph in finished
[DEBUG] [04/24/2018 12:30:59.379] [Server-akka.remote.default-remote-dispatcher-4] [RestartWithBackoffFlow(akka://Server)] Restarting graph in 1032497280 nanoseconds
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment