Created
June 22, 2020 22:48
-
-
Save seanabraham/3a338e117f7d12297dccca4dbc669e8a to your computer and use it in GitHub Desktop.
Some kotlin coroutine exception handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@file:OptIn(ExperimentalTime::class, ExperimentalCoroutinesApi::class) | |
package me.seanabraham.coroutine.playground | |
import kotlinx.coroutines.* | |
import mu.KotlinLogging | |
import kotlin.random.Random | |
import kotlin.random.nextInt | |
import kotlin.time.Duration | |
import kotlin.time.ExperimentalTime | |
import kotlin.time.TimeSource | |
import kotlin.time.milliseconds | |
val log = KotlinLogging.logger {} | |
// Assume this is all internal framework handling, coroutine launched on calling thread from request handling pool | |
fun main() { | |
runBlocking { | |
val exceptionHandler = CoroutineExceptionHandler { _, throwable -> log.error(throwable) { "Caught $throwable in the CoroutineExceptionHandler" } } | |
val supervisor = SupervisorJob(coroutineContext[Job]!!) | |
val requestScope = CoroutineScope(Dispatchers.Default + supervisor + exceptionHandler) | |
val incomingRequest = ServiceRequest(1, 20) | |
val requestHandlers = mapOf( | |
"Propogate client exception" to ::handleRequest, | |
"Wrap exception from service handling code" to ::handleRequestWrapException, | |
"Return a default value for the entire request" to ::handleRequestDefaultValue, | |
"Return a default value for individually failed downstream requests" to ::handleRequestIndividualDefaultValue | |
) | |
// Simulate multiple responses to the different handlers to observe behavior (intentionally synchronous and sequential to capture individual timings for demonstration, in practice it would not work like this) | |
requestHandlers.forEach { (caseName, requestHandler) -> | |
val start = TimeSource.Monotonic.markNow() | |
val response = requestScope.async { | |
requestHandler.invoke(incomingRequest) | |
} | |
println("$caseName:") | |
println("========================================================") | |
try { | |
emitResponse(response.await(), start.elapsedNow()) | |
} catch (e: Exception) { | |
emitServerError(e, start.elapsedNow()) | |
} | |
println("--------------------------------------------------------\n") | |
} | |
supervisor.complete() | |
} | |
} | |
fun emitResponse(response: ServiceResponse, duration: Duration) { | |
log.info("Responding with: HTTP 200 after ${duration.inMilliseconds}ms. Payload ${response.data}") | |
// In pratice we'd actually respond to the request if it was real | |
} | |
fun emitServerError(exception: Exception, duration: Duration) { | |
log.error(exception) { "Responding with: HTTP 500 after ${duration.inMilliseconds}ms" } | |
} | |
class DBClient { | |
suspend fun makeRequest(request: DBRequest): DBResponse { | |
if (request.id == 15) { | |
delay(300) | |
throw DbClientException("Request with id ${request.id} failed") | |
} | |
delay(Random.nextInt(1000..2000).milliseconds) | |
return DBResponse(request.id, request.id * 10.0) | |
} | |
} | |
data class ServiceRequest(val startRange: Int, val endRange: Int) | |
data class DBRequest(val id: Int) | |
data class DBResponse(val id: Int, val data: Double) | |
data class ServiceResponse(val data: Map<Int, Double>) | |
class DbClientException(message: String) : RuntimeException(message) | |
class ServiceException(message: String, cause: Throwable?) : RuntimeException(message, cause) | |
val dbClient = DBClient() | |
// Below is what a service developer might implement when handling a request | |
// -------------------------------------------------------------------------------------------------------- // | |
// In the examples below, the entire body of the suspending function is wrapped in a "supervisorScope" call | |
// This is mostly for the purpose of launching other coroutines (`async` below) but also for defining | |
// the structure with which they launch (i.e. cancelling or throwing an exception from the outer scope will cancel all | |
// of the coroutines launched within it to prevent leaks). | |
// A supervisorScope is used for flexibility as the failure of a child does not necessarily cancel other children (i.e. | |
// we can catch the exception and swallow it or rethrow from the outer scope to actually cancel them). | |
// If we wanted to automatically cancel all children and the scope on the failure of a single one we could use a | |
// coroutineScope instead | |
// In this first example, we will simply propagate the exception thrown that may be thrown by the underlying DbClient | |
// by not catching it. If there's no reason to add special handling and the underlying exception is descriptive enough | |
// this is a reasonable path forward. | |
suspend fun handleRequest(request: ServiceRequest): ServiceResponse = supervisorScope { | |
if (request.startRange < 0 || request.endRange < request.startRange) { | |
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]") | |
} | |
val requestRange = request.startRange..request.endRange | |
// Very simplified and dumb example of making one request per ID | |
// In practice we obviously would rarely, if ever, want to do this exactly | |
val dbResponses = requestRange | |
.map { async { dbClient.makeRequest(DBRequest(it)) } } | |
.awaitAll() | |
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap() | |
ServiceResponse(sortedResponseData) | |
} | |
// In this second example, we'll instead catch the exception and wrap it with more context and rethrow | |
// The net effect is the same (request fails, all requests in flight are cancelled) but the metadata | |
// surfaced to the exception handler is now augmented with what we provide here | |
suspend fun handleRequestWrapException(request: ServiceRequest): ServiceResponse = supervisorScope { | |
val requestRange = request.startRange..request.endRange | |
val dbResponses = try { | |
requestRange | |
.map { async { dbClient.makeRequest(DBRequest(it)) } } | |
.awaitAll() | |
} catch (e: DbClientException) { | |
throw ServiceException("DbClient failure for request range: $requestRange", e) | |
} | |
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap() | |
ServiceResponse(sortedResponseData) | |
} | |
// In this third example, we're instead catching the exception, choosing to swallow it and provide a default value for the entire response | |
suspend fun handleRequestDefaultValue(request: ServiceRequest): ServiceResponse { | |
if (request.startRange < 0 || request.endRange < request.startRange) { | |
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]") | |
} | |
val requestRange = request.startRange..request.endRange | |
val dbResponses = try { | |
supervisorScope { | |
requestRange | |
.map { async { dbClient.makeRequest(DBRequest(it)) } } | |
.awaitAll() | |
} | |
} catch (e: Exception) { | |
// Providing a default value for the entire response if one subrequest failed (but still letting them all be cancelled on failure) | |
listOf(DBResponse(10, 150.0)) | |
} | |
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap() | |
return ServiceResponse(sortedResponseData) | |
} | |
// In the fourth and final example, we're awaiting all the requests individually (which does have the downside of the fact that | |
// we may not catch an exception as soon as it happens but the end-to-end latency in the cases without exceptions is still the same) | |
// and replacing providing a default for an individual failed call | |
suspend fun handleRequestIndividualDefaultValue(request: ServiceRequest): ServiceResponse = supervisorScope { | |
if (request.startRange < 0 || request.endRange < request.startRange) { | |
throw RuntimeException("Invalid request range: [${request.startRange}, ${request.endRange}]") | |
} | |
val requestRange = request.startRange..request.endRange | |
val dbResponses = requestRange | |
.map { | |
val dbRequest = DBRequest(it) | |
dbRequest to async { dbClient.makeRequest(dbRequest) } // short hand to return a pair that we can later destructure in the subsequent map call | |
} | |
.map { (dbRequest, dbResponse) -> | |
try { | |
dbResponse.await() | |
} catch (e: DbClientException) { | |
DBResponse(dbRequest.id, dbRequest.id * 5.0) | |
} | |
} | |
val sortedResponseData = dbResponses.map { it.id to it.data }.toMap().toSortedMap() | |
ServiceResponse(sortedResponseData) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment