Pub Sub Akka actor with Zio Queue
import zio._ | |
import zio.console._ | |
import zio.{Queue, UIO} | |
import akka.actor._ | |
import com.typesafe.config.ConfigFactory | |
import java.util.concurrent.Executors | |
import scala.concurrent.{ExecutionContext} | |
import scala.concurrent.duration.FiniteDuration | |
import java.util.concurrent.{Executors, TimeUnit} | |
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory} | |
object ActorWithQueue { | |
implicit val runtime: DefaultRuntime = new DefaultRuntime {} | |
// Messages | |
case object Subscribe | |
case object Publish | |
case class Message(payload: String) | |
// Actor that will send period messages to subscribers | |
class Publisher() extends Actor { | |
import context._ | |
val scheduler = context.system.scheduler | |
var subscribers = Set.empty[ActorRef] | |
override def preStart(): Unit = | |
scheduler.scheduleOnce(FiniteDuration(5, TimeUnit.SECONDS), self, Publish) | |
def receive = { | |
case Subscribe => | |
subscribers = subscribers + sender() | |
case Publish => | |
val payload = s"""{"data" : "${System.currentTimeMillis()}"}""" | |
for (subscriber <- subscribers) subscriber ! Message(payload) | |
scheduler.scheduleOnce(FiniteDuration(1, TimeUnit.SECONDS), self, Publish) | |
} | |
} | |
// Actor subscribes to publisher and pushes incoming messages to the Zio queue | |
class Subscriber(publisher: ActorRef, queue: Queue[String]) extends Actor { | |
override def preStart() = publisher ! Subscribe | |
def receive = { | |
case Message(payload) => | |
//println(s"Received: $payload") | |
val offer: UIO[Boolean] = queue.offer(payload) | |
runtime.unsafeRun(offer) | |
} | |
} | |
// Create a custom threadpool and execution context for Akka to use | |
val executor: ExecutorService = Executors.newFixedThreadPool(4, new ThreadFactory() { | |
override def newThread(r: Runnable): Thread = { | |
val t = Executors.defaultThreadFactory.newThread(r) | |
t.setDaemon(true) | |
t | |
} | |
}) | |
implicit val akkaEc: ExecutionContext = ExecutionContext.fromExecutor(executor) | |
// Start the Actor system | |
val config = ConfigFactory.load() | |
val as = ActorSystem("actorz", defaultExecutionContext = Some(akkaEc), config = Some(config)) | |
// Start the publisher and a subscriber | |
val a1 = as.actorOf(Props(new Publisher), "publisher") | |
def main(args: Array[String]) : Unit = { | |
def processElement(q: Queue[String]) = | |
q.take.flatMap { payload => | |
putStrLn(s"Processing queue payload: $payload") | |
} | |
val myAppLogic = | |
for { | |
q <- Queue.bounded[String](5); | |
_ = as.actorOf(Props(new Subscriber(a1, q)), "sub-1"); | |
_ <- processElement(q).forever.fork | |
} yield () | |
val program = myAppLogic.fold(_ => 1, _ => 0) | |
runtime.unsafeRun(program) | |
} | |
} |
This comment has been minimized.
This comment has been minimized.
Not sure if you mean why am I using a different threadpool for Zio and Akka, or the fork at the end of the main loop? |
This comment has been minimized.
This comment has been minimized.
petomat
commented
Jul 5, 2019
•
I meant you are using |
This comment has been minimized.
This comment has been minimized.
Ah I see. Yes this is how John described how to do it and I also don’t understand the underlying runtime semantics. :)
… On Jul 5, 2019, at 9:48 AM, Peter Schmitz ***@***.***> wrote:
I meant you are using unsafeRun in the subscriber actor and at the very bottom to run program. I don't really care about thread pools at the moment. (I guess I will instantiate a ZIO runtime with the given akka execution context.) I was just wondering whether the runtime can execute two different ZIO programs (which are sharing a queue) at the same time. I don't know the underlying runtime mechanics.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
petomat commentedJul 5, 2019
I am wondering whether it is safe to use the zio runtime twice, i.e. in two different threads in the end. Haven't tested this, but you probably did. Interesting. Perhaps using runtime twice can evolve into a zio test case / specs specification.