Skip to content

Instantly share code, notes, and snippets.

@brianhsu
Created May 27, 2020 06:06
Show Gist options
  • Save brianhsu/7ec5adabaae8a45e08a0fcf18ecdffa0 to your computer and use it in GitHub Desktop.
Save brianhsu/7ec5adabaae8a45e08a0fcf18ecdffa0 to your computer and use it in GitHub Desktop.
akka.scala
package com.example
import akka.actor.typed.{ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated}
import akka.actor.typed.scaladsl.{Behaviors, PoolRouter, Routers}
import com.example.ParentActor._
import com.example.Transformer.{ConvertPlease, GracefulShutdown}
object Transformer {
sealed trait Request
case class ConvertPlease(str: String, replyTo: ActorRef[DispatcherMessage]) extends Request
case object GracefulShutdown extends Request
def apply(): Behavior[Request] =
Behaviors.setup { context =>
Behaviors.receiveMessage {
case ConvertPlease(str, replyTo) =>
Thread.sleep(scala.util.Random.between(1000, 5000))
replyTo ! ConvertedLine(str + s".....converted")
Behaviors.same
case GracefulShutdown =>
println("===> Got GracefulShutdown")
Behaviors.stopped { () =>
println("====> Cleanup")
}
}
}
}
object ParentActor {
sealed trait DispatcherMessage
case class Line(line: String) extends DispatcherMessage
case class ConvertedLine(line: String) extends DispatcherMessage
case object End extends DispatcherMessage
def apply(): Behavior[DispatcherMessage] =
Behaviors.setup { context =>
val pool: PoolRouter[Transformer.Request] = Routers.pool(poolSize = 4)(
Behaviors.supervise(Transformer())
.onFailure[Exception](SupervisorStrategy.restart))
val router = context.spawn(pool, "worker-pool")
Behaviors.receiveMessage {
case Line(line) =>
router ! ConvertPlease(line, context.self)
Behaviors.same
case ConvertedLine(line) =>
println("===> converted: " + line)
Behaviors.same
case End =>
context.children.foreach(x => println(s"End: $x"))
// How to stop all actors in router, and wait all converted line been collected?
// The following does not work, it will only send message to ONE Transform actor, isntead of all.
router ! GracefulShutdown
Behaviors.same
}
}
}
object AkkaQuickstart {
def main(args: Array[String]) {
val greeterMain: ActorSystem[DispatcherMessage] = ActorSystem(ParentActor(), "AkkaQuickStart")
greeterMain ! Line("data 1")
greeterMain ! Line("data 2")
greeterMain ! Line("data 3")
greeterMain ! Line("data 4")
greeterMain ! Line("data 5")
greeterMain ! Line("data 6")
greeterMain ! Line("data 7")
greeterMain ! Line("data 8")
greeterMain ! End
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment