I've experimented with Kotlin and coroutines in programming Akka. And I must say, I really like the combination so far.
But before I go into it some brief preliminaries for those who don't know Akka and actors.
Actors are a programming model that fits cloud native architectures particularly well. Being highly available and scaling horizontally. All while embracing the realities of multiple servers collaborating, server instances coming and going and the network having hickups.
On the JVM Akka is the prominent actor framework. It's been around for a while now and as a result it's highly reliable, well thought out and offers a wide programming eco system. My own interest in Akka is because of its suitability for software systems that can only be built with business events as a key construct and thinking model. And then of course materialized views, CQRS and near real-time data streams play a big role in constructing those systems.
Akka's Java api can be a bit unwieldy I find. Which is to be expected since Java as a programming language is still lagging quite a bit compared to newer programming languages.
And because Akka actors have to be non-blocking in their implementation. Which means extra bookkeeping we have to do to maintain intermediate state that would otherwise be handled by the JVM stack.
So I thought I'd see what Kotlin and coroutines can do to improve the programming experience. Trying to make the programming model feel as much native Kotlin coroutines as I can.
Our example actor receives requests with a sequence number, keeps a counter of processed messages, does a call to a slow service and sends a response using this data. For this example the actor has to process messages sequentially.
During the call to slowService
we can't block the thread because that would make the actor unresponsive to system messages. Instead with the Java api we'd have to kick off the Java CompletableFuture
, make sure the result when it becomes available is sent to ourselves, go to a waiting mode where we stash all incoming messages for later processing, and when we do receive the result we get our stored message, send the end result and go back to processing mode.
All perfectly doable but the required call stack bookkeeping obscures the high level intent of what we're trying to do.
You can find the code here)
class DemoKotlinActor: ChannelActor<GetCounter>() {
init {
startReceive { // this: ActorScope<GetCounter>
var counter = 0
for(msg in channel){ // suspends
counter += 1
val response = slowService(msg.seqNr) // suspends
sender.counterResponse(seqNr = response, counter = counter)
}
}
}
}
By contrast using Kotlin suspend functions and coroutines the functionality of our actor is just as easy to read as blocking code. The message that we're currently processing is still in scope after the service call so we can just use it and service response to return a result. But it's still non blocking code. So how does that work?
The ChannelActor<E>
is an Akka actor that is a CoroutineScope
. So that we can launch a coroutine (think of it as a lightweight thread) in our actor and be confident that coroutine is stopped when the actor is stopped.
When the actor is started with startReceive
we kick off a coroutine to start processing messages. IntelliJ shows next to startReceive
that we get a this: ActorScope<GetCounter>
to work with. As a result we have a ReceiveChannel<GetCounter>
that's the inbox of our actor. (See below for more information on the signatures.) We can maintain mutable state counter
since we have one single coroutine doing the processing. With our for
statement we're looping over a suspending iterator. To the left you see a a special arrow icon in IntelliJ that indicates that our code will suspend there. Only to continue when we receive a message. We see the same icon next to the call to slowService. So the processing will suspend until we get a response. And then the actor can return the result. Suspending until the next message comes in.
But what if our slowService
call is too slow? Of course we configured our underlying http client and circuitbreaker with a timeout. Our slowService
suspend function will throw an exception in that case. In our current actor that exception is unhandled and our processing of messages will be aborted. We decide that we want to return an exception message to our client in that case and continue processing.
class DemoKotlinActor: ChannelActor<GetCounter>() {
init {
startReceive { // this: ActorScope<GetCounter>
var counter = 0
for(msg in channel){ // suspends
try {
counter += 1
val response = slowService(msg.seqNr) // suspends
sender.counterResponse(seqNr = response, counter = counter)
}catch (e: Exception){
sender.respondFailure(e)
}
}
}
}
}
We can use standard try
catch
exception handling for this.
So how do we run our demo actor and see that it works?
fun main() = runAkka { // this: AkkaScope
val channelActor = actorSystem.demoKotlinActor()
for(i in 1..10){
val response = channelActor.getCounter(i, timeout = 3000) // suspends
println("sent $i received ${response.seqNr}")
}
actorSystem.registerOnTermination { println("actor system terminating") }
delay(5000) // suspends
}
For testing purposes with runAkka we get an Akka actor system in scope and we're in a coroutine scope so we can call suspend functions and launch coroutines. With the extension function demoActor
we get a reference to our actor. Next to our call channelActor.getCounter
we see the icon which tells us our code will suspend there until we get a result or it times out.
We use Kotlin extension functions to document the protocol of our demo actor.
suspend fun ActorRef.getCounter(seqNr: Int, timeout: Long = 1000L): CounterResponse =
ask(GetCounter(seqNr), timeout) // suspends
Personally I really like how well Akka actors and Kotlin coroutines work together. It makes for a very accessible programming model I find.
This demo uses Akka classic actors. It's programming model is a natural fit with Kotlin coroutine channels.
I'm still chewing on what a Kotlin coroutine version of Akka typed would look like.
In a follow up post I'll go into the design principles of Kotlin and coroutines they work out in this api.
You may wonder why go to all this trouble when there are already actors available as part of Kotlin coroutines.
The difference is that Kotlin coroutine actors are a great way to manage mutable state in the face of concurrency.
But they are confined to the memory of one process. They aren't remote addressable, do not survive a server rebooting and can't collaborate across server instances.
Akka actors on the other hand can live in a cluster of servers and thus be highly available and scale horizontally. They can be persistent and thus are very suitable for CQRS.
The this: ActorScope
type is as follows
interface ActorScope<E> : CoroutineScope {
val channel: ReceiveChannel<E>
val sender: ActorRef
}
where ReceiveChannel<E>
is a standard Kotlin receive channel. ActorScope
extends Kotlin CoroutineScope
so that we can safely launch coroutines.
The startReceive
method has the following signature
fun <E> ChannelActor<E>.startReceive(block: suspend ActorScope<E>.() -> Unit): Unit
It uses Kotlin receiver functions which are idiomatic in Kotlin coroutine programming. The block is suspending so we can call suspending functions like slowService
.
Another way of handling the messages would be
class DemoKotlinActor: ChannelActor<GetCounter>() {
init {
startReceive {
var counter = 0
while(!channel.isClosedForReceive){
val msg = channel.receive() // suspends
counter += 1
val response = slowService(msg.seqNr)
sender.counterResponse(seqNr = response, counter = counter)
}
}
}
}
The signature of the runAkka
method is as follows
fun <T> runAkka(block: suspend AkkaScope.() -> T): T
Where AkkaScope
extends CoroutineScope
to allow launching coroutines and provides the actor system
interface AkkaScope : CoroutineScope {
val actorSystem: ActorSystem
}
Great example and really interesting 👏 Thanks for it