Skip to content

Instantly share code, notes, and snippets.

@seanabraham
Created June 22, 2020 22:48
Show Gist options
  • Save seanabraham/3a338e117f7d12297dccca4dbc669e8a to your computer and use it in GitHub Desktop.
Save seanabraham/3a338e117f7d12297dccca4dbc669e8a to your computer and use it in GitHub Desktop.
Some kotlin coroutine exception handling
@file:OptIn(ExperimentalTime::class, ExperimentalCoroutinesApi::class)
package me.seanabraham.coroutine.playground
import kotlinx.coroutines.*
import mu.KotlinLogging
import kotlin.random.Random
import kotlin.random.nextInt
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.TimeSource
import kotlin.time.milliseconds
val log = KotlinLogging.logger {}
// Assume this is all internal framework handling, coroutine launched on calling thread from request handling pool
fun main() {
runBlocking {
val exceptionHandler = CoroutineExceptionHandler { _, throwable -> log.error(throwable) { "Caught $throwable in the CoroutineExceptionHandler" } }
val supervisor = SupervisorJob(coroutineContext[Job]!!)
val requestScope = CoroutineScope(Dispatchers.Default + supervisor + exceptionHandler)
val incomingRequest = ServiceRequest(1, 20)
val requestHandlers = mapOf(
"Propogate client exception" to ::handleRequest,
"Wrap exception from service handling code" to ::handleRequestWrapException,
"Return a default value for the entire request" to ::handleRequestDefaultValue,
"Return a default value for individually failed downstream requests" to ::handleRequestIndividualDefaultValue
)
// Simulate multiple responses to the different handlers to observe behavior (intentionally synchronous and sequential to capture individual timings for demonstration, in practice it would not work like this)
requestHandlers.forEach { (caseName, requestHandler) ->
val start = TimeSource.Monotonic.markNow()
val response = requestScope.async {
requestHandler.invoke(incomingRequest)
}
println("$caseName:")
println("========================================================")
try {
emitResponse(response.await(), start.elapsedNow())
} catch (e: Exception) {
emitServerError(e, start.elapsedNow())
}
println("--------------------------------------------------------\n")
}
supervisor.complete()
}
}
fun emitResponse(response: ServiceResponse, duration: Duration) {
log.info("Responding with: HTTP 200 after ${duration.inMilliseconds}ms. Payload ${response.data}")
// In pratice we'd actually respond to the request if it was real
}
fun emitServerError(exception: Exception, duration: Duration) {
log.error(exception) { "Responding with: HTTP 500 after ${duration.inMilliseconds}ms" }
}
class DBClient {
suspend fun makeRequest(request: DBRequest): DBResponse {
if (request.id == 15) {
delay(300)
throw DbClientException("Request with id ${request.id} failed")
}
delay(Random.nextInt(1000..2000).milliseconds)
return DBResponse(request.id, request.id * 10.0)
}
}
data class ServiceRequest(val startRange: Int, val endRange: Int)
data class DBRequest(val id: Int)
data class DBResponse(val id: Int, val data: Double)
data class ServiceResponse(val data: Map<Int, Double>)
class DbClientException(message: String) : RuntimeException(message)
class ServiceException(message: String, cause: Throwable?) : RuntimeException(message, cause)
val dbClient = DBClient()
// Below is what a service developer might implement when handling a request
// -------------------------------------------------------------------------------------------------------- //
// In the examples below, the entire body of the suspending function is wrapped in a "supervisorScope" call
// This is mostly for the purpose of launching other coroutines (`async` below) but also for defining
// the structure with which they launch (i.e. cancelling or throwing an exception from the outer scope will cancel all
// of the coroutines launched within it to prevent leaks).
// A supervisorScope is used for flexibility as the failure of a child does not necessarily cancel other children (i.e.
// we can catch the exception and swallow it or rethrow from the outer scope to actually cancel them).
// If we wanted to automatically cancel all children and the scope on the failure of a single one we could use a
// coroutineScope instead
// In this first example, we will simply propagate the exception thrown that may be thrown by the underlying DbClient
// by not catching it. If there's no reason to add special handling and the underlying exception is descriptive enough
// this is a reasonable path forward.
suspend fun handleRequest(request: ServiceRequest): ServiceResponse = supervisorScope {
if (request.startRange < 0 || request.endRange < request.startRange) {
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]")
}
val requestRange = request.startRange..request.endRange
// Very simplified and dumb example of making one request per ID
// In practice we obviously would rarely, if ever, want to do this exactly
val dbResponses = requestRange
.map { async { dbClient.makeRequest(DBRequest(it)) } }
.awaitAll()
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap()
ServiceResponse(sortedResponseData)
}
// In this second example, we'll instead catch the exception and wrap it with more context and rethrow
// The net effect is the same (request fails, all requests in flight are cancelled) but the metadata
// surfaced to the exception handler is now augmented with what we provide here
suspend fun handleRequestWrapException(request: ServiceRequest): ServiceResponse = supervisorScope {
val requestRange = request.startRange..request.endRange
val dbResponses = try {
requestRange
.map { async { dbClient.makeRequest(DBRequest(it)) } }
.awaitAll()
} catch (e: DbClientException) {
throw ServiceException("DbClient failure for request range: $requestRange", e)
}
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap()
ServiceResponse(sortedResponseData)
}
// In this third example, we're instead catching the exception, choosing to swallow it and provide a default value for the entire response
suspend fun handleRequestDefaultValue(request: ServiceRequest): ServiceResponse {
if (request.startRange < 0 || request.endRange < request.startRange) {
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]")
}
val requestRange = request.startRange..request.endRange
val dbResponses = try {
supervisorScope {
requestRange
.map { async { dbClient.makeRequest(DBRequest(it)) } }
.awaitAll()
}
} catch (e: Exception) {
// Providing a default value for the entire response if one subrequest failed (but still letting them all be cancelled on failure)
listOf(DBResponse(10, 150.0))
}
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap()
return ServiceResponse(sortedResponseData)
}
// In the fourth and final example, we're awaiting all the requests individually (which does have the downside of the fact that
// we may not catch an exception as soon as it happens but the end-to-end latency in the cases without exceptions is still the same)
// and replacing providing a default for an individual failed call
suspend fun handleRequestIndividualDefaultValue(request: ServiceRequest): ServiceResponse = supervisorScope {
if (request.startRange < 0 || request.endRange < request.startRange) {
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]")
}
val requestRange = request.startRange..request.endRange
val dbResponses = requestRange
.map {
val dbRequest = DBRequest(it)
dbRequest to async { dbClient.makeRequest(dbRequest) } // short hand to return a pair that we can later destructure in the subsequent map call
}
.map { (dbRequest, dbResponse) ->
try {
dbResponse.await()
} catch (e: DbClientException) {
DBResponse(dbRequest.id, dbRequest.id * 5.0)
}
}
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap()
ServiceResponse(sortedResponseData)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment