Skip to content

Instantly share code, notes, and snippets.

@schickling
Created August 15, 2023 21:16
Show Gist options
  • Save schickling/4624bc16c68b586b9bf0d1d0e05ac931 to your computer and use it in GitHub Desktop.
Save schickling/4624bc16c68b586b9bf0d1d0e05ac931 to your computer and use it in GitHub Desktop.
import * as Context from '@effect/data/Context'
import * as Duration from '@effect/data/Duration'
import type * as Either from '@effect/data/Either'
import { pipe } from '@effect/data/Function'
import * as Deferred from '@effect/io/Deferred'
import * as FiberRefs from '@effect/io/FiberRefs'
import * as Queue from '@effect/io/Queue'
import * as Effect from './Effect.js'
import * as Otel from './Otel/index.js'
export type DataLoaderOptions = {
maxBatchSize?: number
/**
* Time in milliseconds how long the dataloader should wait for new keys before calling `batchCallback`
*
* @default 10
*
* TODO replace with a more sophisticated scheduler (e.g. https://github.com/graphql/dataloader#batch-scheduling)
*/
batchTimeoutMs?: number
otelBatchKey: string
}
export const makeDataLoader = <R, Err, TKey, TOut>(
batchCallback: (keys: ReadonlyArray<TKey>) => Effect.Effect<R, never, ReadonlyArray<Either.Either<Err, TOut>>>,
{ batchTimeoutMs = 10, maxBatchSize = Number.POSITIVE_INFINITY, otelBatchKey }: DataLoaderOptions,
) =>
Effect.gen(function* ($) {
type QueueItem = [
key: TKey,
deferred: Deferred.Deferred<Err, TOut>,
env: Context.Context<R>,
refs: FiberRefs.FiberRefs,
]
const queue = yield* $(Queue.unbounded<QueueItem>())
const loadSingle = (key: TKey): Effect.Effect<R, Err, TOut> =>
pipe(
Effect.Do,
Effect.bind('deferred', () => Deferred.make<Err, TOut>()),
Effect.bind('env', () => Effect.context<R>()),
Effect.bind('refs', () => Effect.getFiberRefs),
Effect.tap(({ deferred, env, refs }) => Queue.offer(queue, [key, deferred, env, refs])),
Effect.flatMap(({ deferred }) => Deferred.await(deferred)),
)
function load(key: TKey): Effect.Effect<R, Err, TOut>
function load(key: ReadonlyArray<TKey>): Effect.Effect<R, Err, ReadonlyArray<TOut>>
// eslint-disable-next-line prefer-arrow/prefer-arrow-functions
function load(key: TKey | ReadonlyArray<TKey>): Effect.Effect<R, Err, TOut | ReadonlyArray<TOut>> {
return Array.isArray(key)
? Effect.forEach(key, loadSingle, { concurrency: 'unbounded' })
: loadSingle(key as TKey)
}
let batchRunNumber = 0
yield* $(
Effect.gen(function* ($) {
const batch: QueueItem[] = []
let timeOfFirstItemMs: number | undefined
while (true) {
if (batch.length === 0) {
const itemChunk = yield* $(Queue.takeBetween(queue, 1, maxBatchSize - batch.length))
batch.push(...itemChunk)
timeOfFirstItemMs = Date.now()
} else {
const remainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!)
const itemChunkOption = yield* $(
Queue.takeBetween(queue, 1, maxBatchSize - batch.length),
Effect.timeout(Duration.millis(remainingTime)),
)
if (itemChunkOption._tag === 'None') {
break
}
batch.push(...itemChunkOption.value)
// NOTE this can happen if the timeout above didn't trigger yet and still too much time has passed
const newRemainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!)
if (newRemainingTime <= 0) {
break
}
}
if (batch.length === maxBatchSize) {
break
}
}
return batch
}),
Effect.flatMap((queueItemsBatch) => {
const keys = queueItemsBatch.map(([key]) => key)
const env = queueItemsBatch[0]![2]
const refs = queueItemsBatch[0]![3]
return Effect.acquireUseRelease(
Effect.getFiberRefs,
() =>
Effect.contextWithEffect((parentContext: Context.Context<never>) =>
Effect.flatMap(FiberRefs.setAll(refs), () =>
pipe(
batchCallback([...keys]),
Effect.tap((results) =>
Effect.forEach(results, (result, index) => {
const deferred = queueItemsBatch[index]![1]
return result._tag === 'Left'
? Deferred.fail(deferred, result.left)
: Deferred.succeed(deferred, result.right)
}),
),
Otel.withSpan('DataLoader:batchedLoad', {
attributes: {
batchKey: otelBatchKey,
maxBatchSize,
batchTimeoutMs,
batchSize: queueItemsBatch.length,
batchRunNumber,
},
}),
Otel.histogram(`dataloader_batch_size`, queueItemsBatch.length, { batchKey: otelBatchKey }),
Effect.provideSomeContext(Context.merge(parentContext, env)),
),
),
),
(oldRefs) => Effect.setFiberRefs(oldRefs),
)
}),
Effect.tap(() => Effect.sync(() => batchRunNumber++)),
Effect.forever,
Effect.tapCauseLogPretty,
Effect.interruptible,
Effect.forkScoped,
)
return { load }
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment