Created
July 4, 2019 12:50
-
-
Save justinhj/02781cb68afced3ee9319d7b73176214 to your computer and use it in GitHub Desktop.
Pub Sub Akka actor with Zio Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio._ | |
import zio.console._ | |
import zio.{Queue, UIO} | |
import akka.actor._ | |
import com.typesafe.config.ConfigFactory | |
import java.util.concurrent.Executors | |
import scala.concurrent.{ExecutionContext} | |
import scala.concurrent.duration.FiniteDuration | |
import java.util.concurrent.{Executors, TimeUnit} | |
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory} | |
object ActorWithQueue { | |
implicit val runtime: DefaultRuntime = new DefaultRuntime {} | |
// Messages | |
case object Subscribe | |
case object Publish | |
case class Message(payload: String) | |
// Actor that will send period messages to subscribers | |
class Publisher() extends Actor { | |
import context._ | |
val scheduler = context.system.scheduler | |
var subscribers = Set.empty[ActorRef] | |
override def preStart(): Unit = | |
scheduler.scheduleOnce(FiniteDuration(5, TimeUnit.SECONDS), self, Publish) | |
def receive = { | |
case Subscribe => | |
subscribers = subscribers + sender() | |
case Publish => | |
val payload = s"""{"data" : "${System.currentTimeMillis()}"}""" | |
for (subscriber <- subscribers) subscriber ! Message(payload) | |
scheduler.scheduleOnce(FiniteDuration(1, TimeUnit.SECONDS), self, Publish) | |
} | |
} | |
// Actor subscribes to publisher and pushes incoming messages to the Zio queue | |
class Subscriber(publisher: ActorRef, queue: Queue[String]) extends Actor { | |
override def preStart() = publisher ! Subscribe | |
def receive = { | |
case Message(payload) => | |
//println(s"Received: $payload") | |
val offer: UIO[Boolean] = queue.offer(payload) | |
runtime.unsafeRun(offer) | |
} | |
} | |
// Create a custom threadpool and execution context for Akka to use | |
val executor: ExecutorService = Executors.newFixedThreadPool(4, new ThreadFactory() { | |
override def newThread(r: Runnable): Thread = { | |
val t = Executors.defaultThreadFactory.newThread(r) | |
t.setDaemon(true) | |
t | |
} | |
}) | |
implicit val akkaEc: ExecutionContext = ExecutionContext.fromExecutor(executor) | |
// Start the Actor system | |
val config = ConfigFactory.load() | |
val as = ActorSystem("actorz", defaultExecutionContext = Some(akkaEc), config = Some(config)) | |
// Start the publisher and a subscriber | |
val a1 = as.actorOf(Props(new Publisher), "publisher") | |
def main(args: Array[String]) : Unit = { | |
def processElement(q: Queue[String]) = | |
q.take.flatMap { payload => | |
putStrLn(s"Processing queue payload: $payload") | |
} | |
val myAppLogic = | |
for { | |
q <- Queue.bounded[String](5); | |
_ = as.actorOf(Props(new Subscriber(a1, q)), "sub-1"); | |
_ <- processElement(q).forever.fork | |
} yield () | |
val program = myAppLogic.fold(_ => 1, _ => 0) | |
runtime.unsafeRun(program) | |
} | |
} |
Not sure if you mean why am I using a different threadpool for Zio and Akka, or the fork at the end of the main loop?
I meant you are using unsafeRun
in the subscriber actor and at the very bottom to run program
. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.
Ah I see. Yes this is how John described how to do it and I also don’t understand the underlying runtime semantics. :)
… On Jul 5, 2019, at 9:48 AM, Peter Schmitz ***@***.***> wrote:
I meant you are using unsafeRun in the subscriber actor and at the very bottom to run program. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am wondering whether it is safe to use the zio runtime twice, i.e. in two different threads in the end. Haven't tested this, but you probably did. Interesting. Perhaps using runtime twice can evolve into a zio test case / specs specification.