Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Akka and kotlin coroutines

Akka and Kotlin coroutines: ♡

I've experimented with Kotlin and coroutines in programming Akka. And I must say, I really like the combination so far.
But before I go into it some brief preliminaries for those who don't know Akka and actors.

Actors and Akka

Actors are a programming model that fits cloud native architectures particularly well. Being highly available and scaling horizontally. All while embracing the realities of multiple servers collaborating, server instances coming and going and the network having hickups.

On the JVM Akka is the prominent actor framework. It's been around for a while now and as a result it's highly reliable, well thought out and offers a wide programming eco system. My own interest in Akka is because of its suitability for software systems that can only be built with business events as a key construct and thinking model. And then of course materialized views, CQRS and near real-time data streams play a big role in constructing those systems.

Akka and Java programming; the challenge

Akka's Java api can be a bit unwieldy I find. Which is to be expected since Java as a programming language is still lagging quite a bit compared to newer programming languages.
And because Akka actors have to be non-blocking in their implementation. Which means extra bookkeeping we have to do to maintain intermediate state that would otherwise be handled by the JVM stack.
So I thought I'd see what Kotlin and coroutines can do to improve the programming experience. Trying to make the programming model feel as much native Kotlin coroutines as I can.

Akka and Kotlin coroutines demo

Our example actor receives requests with a sequence number, keeps a counter of processed messages, does a call to a slow service and sends a response using this data. For this example the actor has to process messages sequentially.

During the call to slowService we can't block the thread because that would make the actor unresponsive to system messages. Instead with the Java api we'd have to kick off the Java CompletableFuture, make sure the result when it becomes available is sent to ourselves, go to a waiting mode where we stash all incoming messages for later processing, and when we do receive the result we get our stored message, send the end result and go back to processing mode.
All perfectly doable but the required call stack bookkeeping obscures the high level intent of what we're trying to do.
You can find the code here)

class DemoKotlinActor: ChannelActor<GetCounter>() {
    init {
        startReceive { // this: ActorScope<GetCounter>
            var counter = 0
            for(msg in channel){ // suspends
                counter += 1

                val response = slowService(msg.seqNr) // suspends

                sender.counterResponse(seqNr = response, counter = counter)
            }
        }
    }
}

By contrast using Kotlin suspend functions and coroutines the functionality of our actor is just as easy to read as blocking code. The message that we're currently processing is still in scope after the service call so we can just use it and service response to return a result. But it's still non blocking code. So how does that work?
The ChannelActor<E> is an Akka actor that is a CoroutineScope. So that we can launch a coroutine (think of it as a lightweight thread) in our actor and be confident that coroutine is stopped when the actor is stopped.
When the actor is started with startReceive we kick off a coroutine to start processing messages. IntelliJ shows next to startReceive that we get a this: ActorScope<GetCounter> to work with. As a result we have a ReceiveChannel<GetCounter> that's the inbox of our actor. (See below for more information on the signatures.) We can maintain mutable state counter since we have one single coroutine doing the processing. With our for statement we're looping over a suspending iterator. To the left you see a a special arrow icon in IntelliJ that indicates that our code will suspend there. Only to continue when we receive a message. We see the same icon next to the call to slowService. So the processing will suspend until we get a response. And then the actor can return the result. Suspending until the next message comes in.
But what if our slowService call is too slow? Of course we configured our underlying http client and circuitbreaker with a timeout. Our slowService suspend function will throw an exception in that case. In our current actor that exception is unhandled and our processing of messages will be aborted. We decide that we want to return an exception message to our client in that case and continue processing.

class DemoKotlinActor: ChannelActor<GetCounter>() {
    init {
        startReceive { // this: ActorScope<GetCounter>
            var counter = 0
            for(msg in channel){ // suspends
                try {
                    counter += 1

                    val response = slowService(msg.seqNr) // suspends

                    sender.counterResponse(seqNr = response, counter = counter)
                }catch (e: Exception){
                    sender.respondFailure(e)
                }
            }
        }
    }
}

We can use standard try catch exception handling for this.

Running our actor

So how do we run our demo actor and see that it works?

fun main() = runAkka { // this: AkkaScope
    val channelActor = actorSystem.demoKotlinActor()

    for(i in 1..10){
        val response = channelActor.getCounter(i, timeout = 3000) // suspends
        println("sent $i received ${response.seqNr}")
    }

    actorSystem.registerOnTermination { println("actor system terminating") }
    delay(5000) // suspends
}

For testing purposes with runAkka we get an Akka actor system in scope and we're in a coroutine scope so we can call suspend functions and launch coroutines. With the extension function demoActor we get a reference to our actor. Next to our call channelActor.getCounter we see the icon which tells us our code will suspend there until we get a result or it times out.

We use Kotlin extension functions to document the protocol of our demo actor.

suspend fun ActorRef.getCounter(seqNr: Int, timeout: Long = 1000L): CounterResponse =
    ask(GetCounter(seqNr), timeout) // suspends

Conclusion

Personally I really like how well Akka actors and Kotlin coroutines work together. It makes for a very accessible programming model I find.
This demo uses Akka classic actors. It's programming model is a natural fit with Kotlin coroutine channels.
I'm still chewing on what a Kotlin coroutine version of Akka typed would look like.
In a follow up post I'll go into the design principles of Kotlin and coroutines they work out in this api.

PS: actors

You may wonder why go to all this trouble when there are already actors available as part of Kotlin coroutines.
The difference is that Kotlin coroutine actors are a great way to manage mutable state in the face of concurrency.
But they are confined to the memory of one process. They aren't remote addressable, do not survive a server rebooting and can't collaborate across server instances.
Akka actors on the other hand can live in a cluster of servers and thus be highly available and scale horizontally. They can be persistent and thus are very suitable for CQRS.

PPS: some supporting information

The this: ActorScope type is as follows

interface ActorScope<E> : CoroutineScope {
    val channel: ReceiveChannel<E>

    val sender: ActorRef
}

where ReceiveChannel<E> is a standard Kotlin receive channel. ActorScope extends Kotlin CoroutineScope so that we can safely launch coroutines.

The startReceive method has the following signature

fun <E> ChannelActor<E>.startReceive(block: suspend ActorScope<E>.() -> Unit): Unit

It uses Kotlin receiver functions which are idiomatic in Kotlin coroutine programming. The block is suspending so we can call suspending functions like slowService.

Another way of handling the messages would be

class DemoKotlinActor: ChannelActor<GetCounter>() {
    init {
        startReceive {
            var counter = 0
            while(!channel.isClosedForReceive){
                val msg = channel.receive() // suspends
                counter += 1

                val response = slowService(msg.seqNr)

                sender.counterResponse(seqNr = response, counter = counter)
            }
        }
    }
}

The signature of the runAkka method is as follows

fun <T> runAkka(block: suspend AkkaScope.() -> T): T

Where AkkaScope extends CoroutineScope to allow launching coroutines and provides the actor system

interface AkkaScope : CoroutineScope {
    val actorSystem: ActorSystem
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment