Skip to content

Instantly share code, notes, and snippets.

@tim-smart
Last active November 23, 2023 01:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tim-smart/b33994646435aa860cc95c31ea5d5129 to your computer and use it in GitHub Desktop.
Save tim-smart/b33994646435aa860cc95c31ea5d5129 to your computer and use it in GitHub Desktop.
import { Schema, TreeFormatter } from "@effect/schema"
import {
Cause,
Context,
Data,
Effect,
Either,
Layer,
Option,
PrimaryKey,
ReadonlyArray,
Request,
RequestResolver,
Scope,
pipe,
} from "effect"
import * as Lmdb from "lmdb"
class StorageError extends Data.TaggedError("StorageError")<{
readonly method: string
readonly reason: "Unknown" | "Encode" | "Construction"
readonly message: string
}> {}
export interface StorageOptions {
readonly storeId: string
}
export interface BackingStorageFactory {
readonly make: (
options: StorageOptions,
) => Effect.Effect<Scope.Scope, StorageError, BackingStorage>
}
export const BackingStorageFactory = Context.Tag<BackingStorageFactory>(
"app/BackingStorageFactory",
)
export interface BackingStorage {
readonly getAll: (
keys: Array<string>,
) => Effect.Effect<never, StorageError, Array<Option.Option<unknown>>>
readonly set: (
key: string,
value: unknown,
) => Effect.Effect<never, StorageError, void>
readonly delete: (key: string) => Effect.Effect<never, StorageError, void>
}
export interface SchemaStorageFactory {
readonly make: <I, A>(
schema: Schema.Schema<I, A>,
options: StorageOptions,
) => Effect.Effect<Scope.Scope, StorageError, SchemaStorage<A>>
}
export const SchemaStorageFactory = Context.Tag<SchemaStorageFactory>(
"app/SchemaStorageFactory",
)
export interface SchemaStorage<A> {
readonly getAll: (
keys: Array<string>,
) => Effect.Effect<never, StorageError, Array<Option.Option<A>>>
readonly set: (
key: string,
value: A,
) => Effect.Effect<never, StorageError, void>
}
export const SchemaStorageFactoryLive = Layer.effect(
SchemaStorageFactory,
Effect.gen(function* (_) {
const storageFactory = yield* _(BackingStorageFactory)
return SchemaStorageFactory.of({
make: (schema, options) =>
Effect.gen(function* (_) {
const storage = yield* _(storageFactory.make(options))
const parse = Schema.parse(schema)
const encode = Schema.encode(schema)
return {
getAll: (keys) =>
Effect.flatMap(
storage.getAll(keys),
Effect.forEach((result, i) =>
Option.match(result, {
onNone: () => Effect.succeedNone,
onSome: (_) =>
parse(_).pipe(
Effect.tapError((_) => storage.delete(keys[i])),
Effect.option,
),
}),
),
),
set: (key, value) =>
encode(value).pipe(
Effect.mapError(
(e) =>
new StorageError({
method: "set",
reason: "Encode",
message: TreeFormatter.formatErrors(e.errors),
}),
),
Effect.flatMap((_) => storage.set(key, _)),
),
}
}),
})
}),
)
export interface PersistedResolverOptions<EI, EA, AI, AA>
extends StorageOptions {
readonly Failure: Schema.Schema<EI, EA>
readonly Success: Schema.Schema<AI, AA>
}
export const persisted = <
EI,
EA,
AI,
AA,
Req extends Request.Request<EA, AA> & {
readonly _tag: string
} & PrimaryKey.PrimaryKey,
>(
self: RequestResolver.RequestResolver<Req, never>,
options: PersistedResolverOptions<EI, EA, AI, AA>,
): Effect.Effect<
SchemaStorageFactory | Scope.Scope,
StorageError,
RequestResolver.RequestResolver<Req, never>
> =>
Effect.gen(function* (_) {
const resultSchema = Schema.either(options.Failure, options.Success)
const storage = yield* _(
(yield* _(SchemaStorageFactory)).make(resultSchema, options),
)
const requestKey = (request: Req) =>
`${request._tag}:${request[PrimaryKey.symbol]()}`
const partition = (requests: ReadonlyArray<Req>) =>
storage.getAll(requests.map(requestKey)).pipe(
Effect.map(
ReadonlyArray.partitionMap((_, i) =>
Option.match(_, {
onNone: () => Either.left(requests[i]),
onSome: (_) => Either.right([requests[i], _] as const),
}),
),
),
Effect.orElseSucceed(() => [requests, []] as const),
)
const set = (
request: Req,
result: Request.Request.Result<Req>,
): Effect.Effect<never, never, void> => {
const key = requestKey(request)
if (result._tag === "Failure") {
return Either.match(Cause.failureOrCause(result.cause), {
onLeft: (e) => Effect.ignoreLogged(storage.set(key, Either.left(e))),
onRight: (_cause) => Effect.unit,
})
}
return Effect.ignoreLogged(storage.set(key, Either.right(result.value)))
}
return RequestResolver.makeBatched((requests: Array<Req>) =>
Effect.flatMap(partition(requests), ([remaining, results]) => {
const completeCached = Effect.forEach(
results,
([request, result]) =>
Request.completeEffect(request, result as any) as Effect.Effect<
never,
never,
void
>,
{ discard: true },
)
const completeUncached = pipe(
Effect.forEach(
remaining,
(request) => Effect.exit(Effect.request(request, self)),
{ batching: true },
),
Effect.flatMap((results) =>
Effect.forEach(
results,
(result, i) => {
const request = remaining[i]
return Effect.zipRight(
set(request, result as any),
Request.complete(request, result as any),
)
},
{ discard: true },
),
),
)
return Effect.zipRight(completeCached, completeUncached)
}),
)
})
// memory storage
const MemoryJsonStorageLive = Layer.succeed(
BackingStorageFactory,
BackingStorageFactory.of({
make: (_options) =>
Effect.gen(function* (_) {
const map = new Map<string, unknown>()
return {
getAll: (keys) =>
Effect.forEach(keys, (key) =>
Effect.succeed(Option.fromNullable(map.get(key))),
),
set: (key, value) => Effect.sync(() => map.set(key, value)),
delete: (key) => Effect.sync(() => map.delete(key)),
}
}),
}),
)
const MemorySchemaStorageLive = SchemaStorageFactoryLive.pipe(
Layer.use(MemoryJsonStorageLive),
)
// lmdb storage
const makeLmdb = (options: Lmdb.RootDatabaseOptionsWithPath) =>
Effect.gen(function* (_) {
const lmdb = Lmdb.open(options)
yield* _(Effect.addFinalizer(() => Effect.promise(() => lmdb.close())))
return BackingStorageFactory.of({
make: (options) =>
Effect.gen(function* (_) {
const db = lmdb.openDB({ name: options.storeId })
yield* _(Effect.addFinalizer(() => Effect.promise(() => db.close())))
return {
getAll: (keys) =>
Effect.map(
Effect.tryPromise({
try: () => db.getMany(keys),
catch: (e) =>
new StorageError({
method: "getAll",
reason: "Unknown",
message: `${e}`,
}),
}),
ReadonlyArray.map(Option.fromNullable),
),
set: (key, value) =>
Effect.tryPromise({
try: () => db.put(key, value),
catch: (e) =>
new StorageError({
method: "set",
reason: "Unknown",
message: `${e}`,
}),
}),
delete: (key) =>
Effect.tryPromise({
try: () => db.remove(key),
catch: (e) =>
new StorageError({
method: "delete",
reason: "Unknown",
message: `${e}`,
}),
}),
}
}),
})
})
const LmdbStorageLive = (options: Lmdb.RootDatabaseOptionsWithPath) =>
Layer.scoped(BackingStorageFactory, makeLmdb(options))
const LmdbSchemaStorageLive = (options: Lmdb.RootDatabaseOptionsWithPath) =>
SchemaStorageFactoryLive.pipe(Layer.use(LmdbStorageLive(options)))
// usage
class User extends Schema.Class<User>()({
id: Schema.number,
name: Schema.string,
}) {}
class MyRequest extends Request.TaggedClass("MyRequest")<
never,
User,
{ readonly id: number }
> {
[PrimaryKey.symbol]() {
return this.id.toString()
}
}
const baseResolver = RequestResolver.fromFunctionBatched(
(requests: Array<MyRequest>) =>
requests.map(
(_) =>
new User({
id: _.id,
name: "name",
}),
),
)
const persistedResolver: Effect.Effect<
SchemaStorageFactory | Scope.Scope,
StorageError,
RequestResolver.RequestResolver<MyRequest, never>
> = persisted(baseResolver, {
Failure: Schema.never,
Success: User,
storeId: "user",
})
Effect.gen(function* (_) {
const resolver = yield* _(persistedResolver)
const getUsers = Effect.forEach(
ReadonlyArray.range(0, 10),
(i) => Effect.request(new MyRequest({ id: i }), resolver),
{ batching: true },
)
let users = yield* _(getUsers)
console.log(users)
users = yield* _(getUsers)
console.log(users)
}).pipe(
Effect.scoped,
Effect.provide(LmdbSchemaStorageLive({ path: "./db" })),
Effect.tapErrorCause(Effect.logError),
Effect.runFork,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment