Skip to content

Instantly share code, notes, and snippets.

@justinhj
Created July 4, 2019 12:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinhj/02781cb68afced3ee9319d7b73176214 to your computer and use it in GitHub Desktop.
Save justinhj/02781cb68afced3ee9319d7b73176214 to your computer and use it in GitHub Desktop.
Pub Sub Akka actor with Zio Queue
import zio._
import zio.console._
import zio.{Queue, UIO}
import akka.actor._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext}
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
object ActorWithQueue {
implicit val runtime: DefaultRuntime = new DefaultRuntime {}
// Messages
case object Subscribe
case object Publish
case class Message(payload: String)
// Actor that will send period messages to subscribers
class Publisher() extends Actor {
import context._
val scheduler = context.system.scheduler
var subscribers = Set.empty[ActorRef]
override def preStart(): Unit =
scheduler.scheduleOnce(FiniteDuration(5, TimeUnit.SECONDS), self, Publish)
def receive = {
case Subscribe =>
subscribers = subscribers + sender()
case Publish =>
val payload = s"""{"data" : "${System.currentTimeMillis()}"}"""
for (subscriber <- subscribers) subscriber ! Message(payload)
scheduler.scheduleOnce(FiniteDuration(1, TimeUnit.SECONDS), self, Publish)
}
}
// Actor subscribes to publisher and pushes incoming messages to the Zio queue
class Subscriber(publisher: ActorRef, queue: Queue[String]) extends Actor {
override def preStart() = publisher ! Subscribe
def receive = {
case Message(payload) =>
//println(s"Received: $payload")
val offer: UIO[Boolean] = queue.offer(payload)
runtime.unsafeRun(offer)
}
}
// Create a custom threadpool and execution context for Akka to use
val executor: ExecutorService = Executors.newFixedThreadPool(4, new ThreadFactory() {
override def newThread(r: Runnable): Thread = {
val t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
t
}
})
implicit val akkaEc: ExecutionContext = ExecutionContext.fromExecutor(executor)
// Start the Actor system
val config = ConfigFactory.load()
val as = ActorSystem("actorz", defaultExecutionContext = Some(akkaEc), config = Some(config))
// Start the publisher and a subscriber
val a1 = as.actorOf(Props(new Publisher), "publisher")
def main(args: Array[String]) : Unit = {
def processElement(q: Queue[String]) =
q.take.flatMap { payload =>
putStrLn(s"Processing queue payload: $payload")
}
val myAppLogic =
for {
q <- Queue.bounded[String](5);
_ = as.actorOf(Props(new Subscriber(a1, q)), "sub-1");
_ <- processElement(q).forever.fork
} yield ()
val program = myAppLogic.fold(_ => 1, _ => 0)
runtime.unsafeRun(program)
}
}
@justinhj
Copy link
Author

justinhj commented Jul 5, 2019

Not sure if you mean why am I using a different threadpool for Zio and Akka, or the fork at the end of the main loop?

@petomat
Copy link

petomat commented Jul 5, 2019

I meant you are using unsafeRun in the subscriber actor and at the very bottom to run program. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.

@justinhj
Copy link
Author

justinhj commented Jul 5, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment