Last active
May 10, 2023 02:51
-
-
Save b0o/b5ac3e1880ea413108945775165a1ccd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Creates an async generator from a subscription procedure | |
* | |
* @param procedure - The procedure to subscribe to | |
* @param opts - Options for the subscription | |
* @param opts.isDone - A function that determines if the subscription is done, | |
* called for each value the subscription emits. | |
* | |
* @returns An async generator | |
* | |
* @example | |
* ```typescript | |
* const myProcedureGen = getSubscriptionGenerator(api.myRouter.mySubscriptionProcedure, { | |
* isDone: ({ done }) => done, | |
* }) | |
* const myProcedureInstance = myProcedureGen({ some: 'input' }) | |
* for await (const value of myProcedureInstance.start()) { | |
* console.log(value) | |
* if (value === 'some value') { | |
* myProcedureInstance.stop() // optionally stop the subscription early | |
* } | |
* } | |
* ``` | |
*/ | |
export function getSubscriptionGenerator<TInput, TValue, TError>( | |
procedure: SubscriptionProcedure<TInput, TValue, TError>, | |
opts: IterableSubscriptionOptions<TValue>, | |
) { | |
return function (input: TInput) { | |
return new IterableSubscription<TInput, TValue, TError>(procedure, input, opts) | |
} | |
} | |
export interface Unsubscribable { | |
unsubscribe: () => void | |
} | |
export type inferSubscription<T> = T extends { | |
subscribe: ( | |
arg: infer TInput, | |
opts: { | |
onData: (value: infer TValue) => void | |
onError: (error: infer TError) => void | |
}, | |
) => Unsubscribable | |
} | |
? IterableSubscription<TInput, TValue, TError> | |
: never | |
export interface TRPCSubscriptionObserver<TValue, TError> { | |
onStarted: () => void | |
onData: (value: TValue) => void | |
onError: (err: TError) => void | |
onStopped: () => void | |
onComplete: () => void | |
} | |
export interface SubscriptionProcedure<TInput, TValue, TError> { | |
subscribe: ( | |
input: TInput, | |
opts: TRPCSubscriptionObserver<TValue, TError>, | |
) => { | |
unsubscribe: () => void | |
} | |
} | |
export type SubscriptionDoneCallback<TValue> = (value: TValue) => boolean | |
export const neverDone = () => false | |
export interface IterableSubscriptionOptions<TValue> { | |
isDone: SubscriptionDoneCallback<TValue> | |
} | |
export class IterableSubscription<TInput, TValue, TError> { | |
private procedure: SubscriptionProcedure<TInput, TValue, TError> | |
private input: TInput | |
private opts: { isDone: (val: TValue) => boolean } | |
private state: { | |
started: boolean | |
done: boolean | |
stopped: boolean | |
completed: boolean | |
error: TError | null | |
data: TValue | null | |
next: Promise<TValue> | null | |
resolve?: ((value: TValue) => void) | null | |
reject?: ((err: TError) => void) | null | |
unsubscribe?: (() => void) | null | |
} | |
constructor( | |
procedure: SubscriptionProcedure<TInput, TValue, TError>, | |
input: TInput, | |
opts: IterableSubscriptionOptions<TValue> = { | |
isDone: neverDone, | |
}, | |
) { | |
this.procedure = procedure | |
this.input = input | |
this.opts = opts | |
this.state = { | |
started: false, | |
done: false, | |
stopped: false, | |
completed: false, | |
error: null, | |
data: null, | |
next: null, | |
resolve: null, | |
reject: null, | |
unsubscribe: null, | |
} | |
} | |
async *start() { | |
this.state.next = new Promise((resolve, reject) => { | |
this.state.resolve = resolve | |
this.state.reject = reject | |
const { unsubscribe } = this.procedure.subscribe(this.input, { | |
onStarted: () => { | |
this.state.started = true | |
}, | |
onData: (value: TValue) => { | |
this.state.data = value | |
const { resolve } = this.state | |
if (this.opts.isDone(value)) { | |
this.state.done = true | |
this.stop() | |
} else { | |
this.state.next = new Promise((resolve, reject) => { | |
this.state.resolve = resolve | |
this.state.reject = reject | |
}) | |
} | |
if (resolve) resolve(value) | |
}, | |
onError: (err: TError) => { | |
this.state.error = err | |
if (this.state.reject) { | |
reject(err) | |
} else { | |
// eslint-disable-next-line @typescript-eslint/no-throw-literal | |
throw err | |
} | |
}, | |
onStopped: () => { | |
this.state.stopped = true | |
}, | |
onComplete: () => { | |
this.state.completed = true | |
}, | |
}) | |
this.state.unsubscribe = unsubscribe | |
}) | |
while (!this.state.stopped && !this.state.completed && !this.state.done) { | |
const val = await this.state.next | |
yield val | |
} | |
this.stop() | |
} | |
stop() { | |
if (this.state.unsubscribe) { | |
this.state.unsubscribe() | |
} | |
this.state.unsubscribe = undefined | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment