Skip to content

Instantly share code, notes, and snippets.

@tim-smart
Created October 29, 2023 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tim-smart/3d21a88c7d8a5765052713f5bcc1eb09 to your computer and use it in GitHub Desktop.
Save tim-smart/3d21a88c7d8a5765052713f5bcc1eb09 to your computer and use it in GitHub Desktop.
import {
Chunk,
Deferred,
Duration,
Effect,
Queue,
Ref,
Request,
RequestResolver,
} from "effect"
interface DataLoaderItem<A extends Request.Request<any, any>> {
readonly request: A
readonly deferred: Deferred.Deferred<
Request.Request.Error<A>,
Request.Request.Success<A>
>
}
export const dataLoader = <A extends Request.Request<any, any>>(
self: RequestResolver.RequestResolver<A, never>,
options: {
readonly window: Duration.DurationInput
readonly maxBatchSize?: number
},
) =>
Effect.gen(function* (_) {
const queue = yield* _(Queue.unbounded<DataLoaderItem<A>>())
const batch = yield* _(Ref.make(Chunk.empty<DataLoaderItem<A>>()))
const takeOne = Effect.flatMap(Queue.take(queue), item =>
Ref.updateAndGet(batch, Chunk.append(item)),
)
const takeRest = takeOne.pipe(
Effect.repeatUntil(
items =>
options.maxBatchSize !== undefined &&
items.length >= options.maxBatchSize,
),
Effect.timeout(options.window),
Effect.zipRight(Ref.getAndSet(batch, Chunk.empty())),
)
yield* _(
takeOne,
Effect.zipRight(takeRest),
Effect.flatMap(
Effect.filter(({ deferred }) => Deferred.isDone(deferred), {
negate: true,
}),
),
Effect.flatMap(
Effect.forEach(
({ request, deferred }) =>
Effect.flatMap(Effect.exit(Effect.request(request, self)), exit =>
Deferred.complete(deferred, exit),
),
{ batching: true, discard: true },
),
),
Effect.forever,
Effect.forkScoped,
)
return RequestResolver.fromFunctionEffect((request: A) =>
Effect.flatMap(
Deferred.make<Request.Request.Error<A>, Request.Request.Success<A>>(),
deferred =>
Queue.offer(queue, { request, deferred }).pipe(
Effect.zipRight(Deferred.await(deferred)),
Effect.onInterrupt(() => Deferred.interrupt(deferred)),
),
),
)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment