/**
- Creates an async generator from a subscription procedure
- @param procedure The procedure to subscribe to
- @param opts Options for the subscription
- @returns An async generator
- @example
-
- const myProcedureGen = getSubscriptionGenerator(mySubscriptionProcedure, {
- isDone: ({ done }) => done
- })
- const myProcedure = myProcedureGen({ some: 'input' })
- for await (const value of myProcedure.start()) {
- console.log(value)
- if (value === 'some value') {
-
myProcedure.stop()
- }
- } */ export function getSubscriptionGenerator<TInput, TValue, TError>( procedure: SubscriptionProcedure<TInput, TValue, TError>, opts: IterableSubscriptionOptions, ) { return function (input: TInput) { return new IterableSubscription<TInput, TValue, TError>(procedure, input, opts) } }
export interface Unsubscribable { unsubscribe: () => void }
export type inferSubscription = T extends { subscribe: ( arg: infer TInput, opts: { onData: (value: infer TValue) => void onError: (error: infer TError) => void }, ) => Unsubscribable } ? IterableSubscription<TInput, TValue, TError> : never
export interface TRPCSubscriptionObserver<TValue, TError> { onStarted: () => void onData: (value: TValue) => void onError: (err: TError) => void onStopped: () => void onComplete: () => void }
export interface SubscriptionProcedure<TInput, TValue, TError> { subscribe: ( input: TInput, opts: TRPCSubscriptionObserver<TValue, TError>, ) => { unsubscribe: () => void } }
export type SubscriptionDoneCallback = (value: TValue) => boolean
export const neverDone = () => false
export interface IterableSubscriptionOptions { isDone: SubscriptionDoneCallback }
export class IterableSubscription<TInput, TValue, TError> { private procedure: SubscriptionProcedure<TInput, TValue, TError> private input: TInput private opts: { isDone: (val: TValue) => boolean }
private state: { started: boolean done: boolean stopped: boolean completed: boolean
error: TError | null
data: TValue | null
next: Promise<TValue> | null
resolve?: ((value: TValue) => void) | null
reject?: ((err: TError) => void) | null
unsubscribe?: (() => void) | null
}
constructor( procedure: SubscriptionProcedure<TInput, TValue, TError>, input: TInput, opts: IterableSubscriptionOptions = { isDone: neverDone, }, ) { this.procedure = procedure this.input = input this.opts = opts
this.state = {
started: false,
done: false,
stopped: false,
completed: false,
error: null,
data: null,
next: null,
resolve: null,
reject: null,
unsubscribe: null,
}
}
async *start() { this.state.next = new Promise((resolve, reject) => { this.state.resolve = resolve this.state.reject = reject const { unsubscribe } = this.procedure.subscribe(this.input, { onStarted: () => { this.state.started = true }, onData: (value: TValue) => { this.state.data = value const { resolve } = this.state if (this.opts.isDone(value)) { this.state.done = true this.stop() } else { this.state.next = new Promise((resolve, reject) => { this.state.resolve = resolve this.state.reject = reject }) } if (resolve) resolve(value) }, onError: (err: TError) => { this.state.error = err if (this.state.reject) { reject(err) } else { // eslint-disable-next-line @typescript-eslint/no-throw-literal throw err } }, onStopped: () => { this.state.stopped = true }, onComplete: () => { this.state.completed = true }, }) this.state.unsubscribe = unsubscribe })
while (!this.state.stopped && !this.state.completed && !this.state.done) {
const val = await this.state.next
yield val
}
this.stop()
}
stop() { if (this.state.unsubscribe) { this.state.unsubscribe() } this.state.unsubscribe = undefined } }