Skip to content

Instantly share code, notes, and snippets.

@b0o
Last active May 10, 2023 02:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save b0o/b5ac3e1880ea413108945775165a1ccd to your computer and use it in GitHub Desktop.
Save b0o/b5ac3e1880ea413108945775165a1ccd to your computer and use it in GitHub Desktop.
/**
* Creates an async generator from a subscription procedure
*
* @param procedure - The procedure to subscribe to
* @param opts - Options for the subscription
* @param opts.isDone - A function that determines if the subscription is done,
* called for each value the subscription emits.
*
* @returns An async generator
*
* @example
* ```typescript
* const myProcedureGen = getSubscriptionGenerator(api.myRouter.mySubscriptionProcedure, {
* isDone: ({ done }) => done,
* })
* const myProcedureInstance = myProcedureGen({ some: 'input' })
* for await (const value of myProcedureInstance.start()) {
* console.log(value)
* if (value === 'some value') {
* myProcedureInstance.stop() // optionally stop the subscription early
* }
* }
* ```
*/
export function getSubscriptionGenerator<TInput, TValue, TError>(
procedure: SubscriptionProcedure<TInput, TValue, TError>,
opts: IterableSubscriptionOptions<TValue>,
) {
return function (input: TInput) {
return new IterableSubscription<TInput, TValue, TError>(procedure, input, opts)
}
}
export interface Unsubscribable {
unsubscribe: () => void
}
export type inferSubscription<T> = T extends {
subscribe: (
arg: infer TInput,
opts: {
onData: (value: infer TValue) => void
onError: (error: infer TError) => void
},
) => Unsubscribable
}
? IterableSubscription<TInput, TValue, TError>
: never
export interface TRPCSubscriptionObserver<TValue, TError> {
onStarted: () => void
onData: (value: TValue) => void
onError: (err: TError) => void
onStopped: () => void
onComplete: () => void
}
export interface SubscriptionProcedure<TInput, TValue, TError> {
subscribe: (
input: TInput,
opts: TRPCSubscriptionObserver<TValue, TError>,
) => {
unsubscribe: () => void
}
}
export type SubscriptionDoneCallback<TValue> = (value: TValue) => boolean
export const neverDone = () => false
export interface IterableSubscriptionOptions<TValue> {
isDone: SubscriptionDoneCallback<TValue>
}
export class IterableSubscription<TInput, TValue, TError> {
private procedure: SubscriptionProcedure<TInput, TValue, TError>
private input: TInput
private opts: { isDone: (val: TValue) => boolean }
private state: {
started: boolean
done: boolean
stopped: boolean
completed: boolean
error: TError | null
data: TValue | null
next: Promise<TValue> | null
resolve?: ((value: TValue) => void) | null
reject?: ((err: TError) => void) | null
unsubscribe?: (() => void) | null
}
constructor(
procedure: SubscriptionProcedure<TInput, TValue, TError>,
input: TInput,
opts: IterableSubscriptionOptions<TValue> = {
isDone: neverDone,
},
) {
this.procedure = procedure
this.input = input
this.opts = opts
this.state = {
started: false,
done: false,
stopped: false,
completed: false,
error: null,
data: null,
next: null,
resolve: null,
reject: null,
unsubscribe: null,
}
}
async *start() {
this.state.next = new Promise((resolve, reject) => {
this.state.resolve = resolve
this.state.reject = reject
const { unsubscribe } = this.procedure.subscribe(this.input, {
onStarted: () => {
this.state.started = true
},
onData: (value: TValue) => {
this.state.data = value
const { resolve } = this.state
if (this.opts.isDone(value)) {
this.state.done = true
this.stop()
} else {
this.state.next = new Promise((resolve, reject) => {
this.state.resolve = resolve
this.state.reject = reject
})
}
if (resolve) resolve(value)
},
onError: (err: TError) => {
this.state.error = err
if (this.state.reject) {
reject(err)
} else {
// eslint-disable-next-line @typescript-eslint/no-throw-literal
throw err
}
},
onStopped: () => {
this.state.stopped = true
},
onComplete: () => {
this.state.completed = true
},
})
this.state.unsubscribe = unsubscribe
})
while (!this.state.stopped && !this.state.completed && !this.state.done) {
const val = await this.state.next
yield val
}
this.stop()
}
stop() {
if (this.state.unsubscribe) {
this.state.unsubscribe()
}
this.state.unsubscribe = undefined
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment