Skip to content

Instantly share code, notes, and snippets.

@mattiamanzati
Created May 19, 2023 14:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattiamanzati/30ecf0eb1a7fe6f25c860bc03599ed45 to your computer and use it in GitHub Desktop.
Save mattiamanzati/30ecf0eb1a7fe6f25c860bc03599ed45 to your computer and use it in GitHub Desktop.
import * as DU from "@effect/data/Duration"
import { pipe } from "@effect/data/Function"
import * as HM from "@effect/data/HashMap"
import * as O from "@effect/data/Option"
import * as D from "@effect/io/Deferred"
import * as T from "@effect/io/Effect"
import * as FID from "@effect/io/Fiber/Id"
/**
* Given a function (key: K) => Effect<R, E, A> this DataLoader
* batches and caches requests such as concurrent lookups
* for the same key cannot happen.
* Subsequent lookups will use the cache value, if any.
*/
export class DataLoader<K, R, E, A> {
cacheMap: HM.HashMap<K, A> = HM.empty()
pendingArray: [K, D.Deferred<E, A>][] = []
semaphore = T.unsafeMakeSemaphore(1)
constructor(
readonly resolve: (key: K[]) => T.Effect<R, E, A[]>,
readonly cachingEnabled: boolean
) {}
invalidateAll() {
return T.sync(() => {
this.cacheMap = HM.empty()
})
}
run(): T.Effect<R, E, void> {
return pipe(
T.unit(),
T.delay(DU.millis(1)),
T.zipRight(
T.sync(() => {
const items = this.pendingArray.splice(0)
let indexMap: HM.HashMap<K, D.Deferred<E, A>[]> = HM.make() as any
for (const [key, deferred] of items) {
indexMap = pipe(
indexMap,
HM.modifyAt(key, (list) =>
pipe(
list,
O.getOrElse(() => [] as D.Deferred<E, A>[]),
(_) => O.some(_.concat([deferred]))
)
)
)
}
return indexMap
})
),
T.flatMap((indexMap) => {
const keys = Array.from(HM.keys(indexMap))
const allDeferreds = pipe(
indexMap,
HM.values,
(_) => Array.from(_),
(_) => _.flat()
)
return keys.length > 0
? pipe(
this.resolve(keys),
T.flatMap(
T.forEachWithIndex((result, i) =>
pipe(
indexMap,
HM.unsafeGet(keys[i]),
T.forEachPar((deferred) => pipe(deferred, D.succeed(result)))
)
)
),
T.catchAllCause((cause) =>
pipe(
allDeferreds,
T.forEachPar((deferred) => pipe(deferred, D.failCause(cause)))
)
),
T.raceFirst(
pipe(
allDeferreds,
T.forEach((deferred) =>
pipe(
D.await(deferred),
T.catchAllCause(() => T.unit())
)
)
)
),
T.asUnit
)
: T.unit()
}),
this.semaphore.withPermits(1),
T.forkDaemon,
T.asUnit
)
}
lookup(key: K): T.Effect<R, E, A> {
const cacheEntry = pipe(this.cacheMap, HM.get(key))
if (!this.cachingEnabled)
return pipe(
this.resolve([key]),
T.map((_) => _[0])
)
if (this.cachingEnabled && O.isSome(cacheEntry)) return T.succeed(cacheEntry.value)
return T.acquireUseRelease(
T.sync(() => {
const deferred = D.unsafeMake<E, A>(FID.none)
this.pendingArray.push([key, deferred])
return deferred
}),
(deferred) => pipe(this.run(), T.zipParRight(D.await(deferred))),
(deferred) => D.interrupt(deferred)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment