Skip to content

Instantly share code, notes, and snippets.

@mattiamanzati
Created October 29, 2023 11:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattiamanzati/4f0790d6185cba6169fb0b8df775bf83 to your computer and use it in GitHub Desktop.
Save mattiamanzati/4f0790d6185cba6169fb0b8df775bf83 to your computer and use it in GitHub Desktop.
Effect dataloader
import * as D from "effect/Deferred"
import * as DU from "effect/Duration"
import * as T from "effect/Effect"
import * as FID from "effect/FiberId"
import { pipe } from "effect/Function"
import * as HM from "effect/HashMap"
import * as O from "effect/Option"
/**
* Given a function (key: K) => Effect<R, E, A> this DataLoader
* batches and caches requests such as concurrent lookups
* for the same key cannot happen.
* Subsequent lookups will use the cache value, if any.
*/
export class DataLoader<K, R, E, A> {
cacheMap: HM.HashMap<K, A> = HM.empty()
pendingArray: [K, D.Deferred<E, A>][] = []
semaphore = T.unsafeMakeSemaphore(1)
constructor(
readonly resolve: (key: K[]) => T.Effect<R, E, A[]>,
readonly cachingEnabled: boolean
) {}
invalidateAll = T.sync(() => {
this.cacheMap = HM.empty()
})
run: T.Effect<R, E, void> = pipe(
T.sleep(DU.millis(1)),
T.zipRight(
T.sync(() => {
const items = this.pendingArray.splice(0)
let indexMap: HM.HashMap<K, D.Deferred<E, A>[]> = HM.make() as any
for (const [key, deferred] of items) {
indexMap = pipe(
indexMap,
HM.modifyAt(key, (list) =>
pipe(
list,
O.getOrElse(() => [] as D.Deferred<E, A>[]),
(_) => O.some(_.concat([deferred]))
)
)
)
}
return indexMap
})
),
T.flatMap((indexMap) => {
const keys = Array.from(HM.keys(indexMap))
const allDeferreds = pipe(
indexMap,
HM.values,
(_) => Array.from(_),
(_) => _.flat()
)
return keys.length > 0
? pipe(
this.resolve(keys),
T.flatMap(
T.forEach((result, i) =>
pipe(
indexMap,
HM.unsafeGet(keys[i]),
T.forEach((deferred) => pipe(deferred, D.succeed(result)), {
concurrency: "inherit"
})
)
)
),
T.catchAllCause((cause) =>
pipe(
allDeferreds,
T.forEach((deferred) => pipe(deferred, D.failCause(cause)), {
concurrency: "inherit"
})
)
),
T.raceFirst(
pipe(
allDeferreds,
T.forEach((deferred) =>
pipe(
D.await(deferred),
T.catchAllCause(() => T.unit)
)
)
)
),
T.asUnit
)
: T.unit
}),
this.semaphore.withPermits(1),
T.forkDaemon,
T.asUnit
)
lookup(key: K): T.Effect<R, E, A> {
const cacheEntry = pipe(this.cacheMap, HM.get(key))
if (!this.cachingEnabled)
return pipe(
this.resolve([key]),
T.map((_) => _[0])
)
if (this.cachingEnabled && O.isSome(cacheEntry)) return T.succeed(cacheEntry.value)
return T.acquireUseRelease(
T.sync(() => {
const deferred = D.unsafeMake<E, A>(FID.none)
this.pendingArray.push([key, deferred])
return deferred
}),
(deferred) => pipe(this.run, T.zipRight(D.await(deferred), { concurrent: true })),
(deferred) => D.interrupt(deferred)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment