Created July 4, 2019 12:50
Pub Sub Akka actor with Zio Queue
import zio._
import zio.console._
import zio.{Queue, UIO}
import com.typesafe.config.ConfigFactory
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext}
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
object ActorWithQueue {
implicit val runtime: DefaultRuntime = new DefaultRuntime {}
// Messages
case object Subscribe
case object Publish
case class Message(payload: String)
// Actor that will send period messages to subscribers
class Publisher() extends Actor {
import context._
val scheduler = context.system.scheduler
var subscribers = Set.empty[ActorRef]
override def preStart(): Unit =
scheduler.scheduleOnce(FiniteDuration(5, TimeUnit.SECONDS), self, Publish)
def receive = {
case Subscribe =>
subscribers = subscribers + sender()
case Publish =>
val payload = s"""{"data" : "${System.currentTimeMillis()}"}"""
for (subscriber <- subscribers) subscriber ! Message(payload)
scheduler.scheduleOnce(FiniteDuration(1, TimeUnit.SECONDS), self, Publish)
// Actor subscribes to publisher and pushes incoming messages to the Zio queue
class Subscriber(publisher: ActorRef, queue: Queue[String]) extends Actor {
override def preStart() = publisher ! Subscribe
def receive = {
case Message(payload) =>
//println(s"Received: $payload")
val offer: UIO[Boolean] = queue.offer(payload)
// Create a custom threadpool and execution context for Akka to use
val executor: ExecutorService = Executors.newFixedThreadPool(4, new ThreadFactory() {
override def newThread(r: Runnable): Thread = {
val t = Executors.defaultThreadFactory.newThread(r)
implicit val akkaEc: ExecutionContext = ExecutionContext.fromExecutor(executor)
// Start the Actor system
val config = ConfigFactory.load()
val as = ActorSystem("actorz", defaultExecutionContext = Some(akkaEc), config = Some(config))
// Start the publisher and a subscriber
val a1 = as.actorOf(Props(new Publisher), "publisher")
def main(args: Array[String]) : Unit = {
def processElement(q: Queue[String]) =
q.take.flatMap { payload =>
putStrLn(s"Processing queue payload: $payload")
val myAppLogic =
for {
q <- Queue.bounded[String](5);
_ = as.actorOf(Props(new Subscriber(a1, q)), "sub-1");
_ <- processElement(q).forever.fork
} yield ()
val program = myAppLogic.fold(_ => 1, _ => 0)
petomat commented Jul 5, 2019

I am wondering whether it is safe to use the zio runtime twice, i.e. in two different threads in the end. Haven't tested this, but you probably did. Interesting. Perhaps using runtime twice can evolve into a zio test case / specs specification.

justinhj commented Jul 5, 2019

Not sure if you mean why am I using a different threadpool for Zio and Akka, or the fork at the end of the main loop?

petomat commented Jul 5, 2019

I meant you are using unsafeRun in the subscriber actor and at the very bottom to run program. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.

justinhj commented Jul 5, 2019 via email

