Skip to content

Instantly share code, notes, and snippets.

@justinhj
Created July 4, 2019 12:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinhj/02781cb68afced3ee9319d7b73176214 to your computer and use it in GitHub Desktop.
Save justinhj/02781cb68afced3ee9319d7b73176214 to your computer and use it in GitHub Desktop.
Pub Sub Akka actor with Zio Queue
import zio._
import zio.console._
import zio.{Queue, UIO}
import akka.actor._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext}
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
object ActorWithQueue {
implicit val runtime: DefaultRuntime = new DefaultRuntime {}
// Messages
case object Subscribe
case object Publish
case class Message(payload: String)
// Actor that will send period messages to subscribers
class Publisher() extends Actor {
import context._
val scheduler = context.system.scheduler
var subscribers = Set.empty[ActorRef]
override def preStart(): Unit =
scheduler.scheduleOnce(FiniteDuration(5, TimeUnit.SECONDS), self, Publish)
def receive = {
case Subscribe =>
subscribers = subscribers + sender()
case Publish =>
val payload = s"""{"data" : "${System.currentTimeMillis()}"}"""
for (subscriber <- subscribers) subscriber ! Message(payload)
scheduler.scheduleOnce(FiniteDuration(1, TimeUnit.SECONDS), self, Publish)
}
}
// Actor subscribes to publisher and pushes incoming messages to the Zio queue
class Subscriber(publisher: ActorRef, queue: Queue[String]) extends Actor {
override def preStart() = publisher ! Subscribe
def receive = {
case Message(payload) =>
//println(s"Received: $payload")
val offer: UIO[Boolean] = queue.offer(payload)
runtime.unsafeRun(offer)
}
}
// Create a custom threadpool and execution context for Akka to use
val executor: ExecutorService = Executors.newFixedThreadPool(4, new ThreadFactory() {
override def newThread(r: Runnable): Thread = {
val t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
t
}
})
implicit val akkaEc: ExecutionContext = ExecutionContext.fromExecutor(executor)
// Start the Actor system
val config = ConfigFactory.load()
val as = ActorSystem("actorz", defaultExecutionContext = Some(akkaEc), config = Some(config))
// Start the publisher and a subscriber
val a1 = as.actorOf(Props(new Publisher), "publisher")
def main(args: Array[String]) : Unit = {
def processElement(q: Queue[String]) =
q.take.flatMap { payload =>
putStrLn(s"Processing queue payload: $payload")
}
val myAppLogic =
for {
q <- Queue.bounded[String](5);
_ = as.actorOf(Props(new Subscriber(a1, q)), "sub-1");
_ <- processElement(q).forever.fork
} yield ()
val program = myAppLogic.fold(_ => 1, _ => 0)
runtime.unsafeRun(program)
}
}
@petomat
Copy link

petomat commented Jul 5, 2019

I am wondering whether it is safe to use the zio runtime twice, i.e. in two different threads in the end. Haven't tested this, but you probably did. Interesting. Perhaps using runtime twice can evolve into a zio test case / specs specification.

@justinhj
Copy link
Author

justinhj commented Jul 5, 2019

Not sure if you mean why am I using a different threadpool for Zio and Akka, or the fork at the end of the main loop?

@petomat
Copy link

petomat commented Jul 5, 2019

I meant you are using unsafeRun in the subscriber actor and at the very bottom to run program. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.

@justinhj
Copy link
Author

justinhj commented Jul 5, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment