Skip to content

Instantly share code, notes, and snippets.

@lennyburdette
Last active January 14, 2022 13:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lennyburdette/f3fe6ae7a498698774cc95d1bfc956b4 to your computer and use it in GitHub Desktop.
Save lennyburdette/f3fe6ae7a498698774cc95d1bfc956b4 to your computer and use it in GitHub Desktop.
graphql-kotlin + java-dataloader with suspended functions

This gist provides wrappers around java-dataloader's classes to make them compatible with coroutines, as well as a "dispatcher" that provides a mechanism for dispatching dataloaders enqueued during a GraphQL operation execution.

Rationale

For context on why this is necessary, see this issue on graphql-java and specifically this comment with a solution that Apollo uses for their own GraphQL services.

The crux of the issue is that graphql-java's dataloader dispatching mechanism assumes one dataloader per field, and does not allow mixing coroutines and dataloaders or composing multiple dataloaders.

The existing dataloader instrumentation keeps track of how many times .load() is called when executing resolvers at one level of field resolution. After resolving all fields in one level, it then calls dispatch() the same number of times.

If you call the dataloader in a suspended function, the graphql-kotlin library wraps it in a future and the inner dataloader isn't visible to the instrumentation.

If you call another dataloader in sequence (i.e. with thenAccept), the second dataloader isn't tracked because the instrumentation has moved on to the next level.

In either case, the GraphQL executor hangs.

This is very different from the original JavaScript implementation of dataloader, which uses Node's nextTick or the browser's setTimeout to enqueue a dispatch after some amount of work is done. Single-threaded, evented runtimes are way easier!

The Solution

This Kotlin-friendly solution is to use a debounced flow to enqueue and dispatch work without an event loop.

Each time we call load(), we emit a value on the flow — this is simpler than counting calls per-level.

Dave Glasser's insight was that flow { }.debounce() allows us to enqueue several calls to load() before dispatching the dataloaders, fulfilling the promises for each load() in a batch. Dataloaders are used to batch network calls and database queries that generally take many milliseconds, so a one millisecond delay while we enqueue load() calls is acceptable.

interface KtDataLoaderRegistryFactory {
fun build(scope: CoroutineScope): DataLoaderRegistry
}
class KtDataLoaderRegistry(
private val registry: DataLoaderRegistry,
private val dispatchAfterLoad: suspend () -> Unit
) {
fun <K, V> getDataLoader(key: String) = KtDataLoader(
registry.getDataLoader<K, V>(key),
dispatchAfterLoad
)
}
class KtDataLoader<K, V>(
private val dataLoader: DataLoader<K, V>,
private val dispatchAfterLoad: suspend () -> Unit
) {
suspend fun load(key: K): V = dataLoader.load(key).also {
dispatchAfterLoad.invoke()
}.await()
suspend fun loadMany(keys: List<K>): List<V> = dataLoader.loadMany(keys).also {
dispatchAfterLoad.invoke()
}.await()
fun clear(key: K) = dataLoader.clear(key)
fun clearAll() = dataLoader.clearAll()
fun prime(key: K, value: V) = dataLoader.prime(key, value)
fun prime(key: K, exception: Exception) = dataLoader.prime(key, exception)
}
class KtDataLoaderDispatcher(
private val dataLoaderRegistryFactory: KtDataLoaderRegistryFactory
) {
suspend fun <R> run(block: suspend (registry: KtDataLoaderRegistry) -> R): R {
var result: R? = null
supervisorScope {
val registry = dataLoaderRegistryFactory.build(this)
channelFlow<Unit> {
val wrappedRegistry = KtDataLoaderRegistry(registry) { channel.send(Unit) }
result = block(wrappedRegistry)
}
.debounce(1)
.collect { registry.dispatchAll() }
}
return result!!
}
}
// First, we need to a dataloader registry factory that passes in the coroutine context and wraps
// dataloaders so that they emit values on the flow.
class DataLoaderRegistryFactory(val client: Client) : KtDataLoaderRegistryFactory {
override fun build(scope: CoroutineScope) = DataLoaderRegistry()
.register("LOADER_ONE", newDataLoader<String, Foo> { ids ->
scope.future { client.batchFetch(ids) }
})
}
// Second, we need to wrap GraphQL execution in the dispatcher, add attach the wrapped dataloaders
// to the graphQL context.
val dispatcher = KtDataLoaderDispatcher(DataLoaderRegistryFactory(client))
dispatcher.run { dataloaders ->
graphql.executeAsync { builder ->
builder.query("{ hello { world } }")
.context(MyContext(dataloaders))
}.await().toSpecification()
}
// The dispatcher yields wrapped dataloaders to the lambda that automatically configured to emit
// values, but the API (including `load()` and `loadMany()`) is the same.
// Finally, we can access the dataloaders in resolvers using the context:
class Query {
suspend fun hello(context: MyContext): Foo {
return context.dataloaders.getDataLoader<String, Foo>("LOADER_ONE").load("id")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment