Skip to content

Instantly share code, notes, and snippets.

@b0o
Created May 10, 2023 02:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save b0o/3d94ee606effb446e82d9511324b69c2 to your computer and use it in GitHub Desktop.
Save b0o/3d94ee606effb446e82d9511324b69c2 to your computer and use it in GitHub Desktop.

/**

  • Creates an async generator from a subscription procedure
  • @param procedure The procedure to subscribe to
  • @param opts Options for the subscription
  • @returns An async generator
  • @example
  • const myProcedureGen = getSubscriptionGenerator(mySubscriptionProcedure, {
  • isDone: ({ done }) => done
  • })
  • const myProcedure = myProcedureGen({ some: 'input' })
  • for await (const value of myProcedure.start()) {
  • console.log(value)
  • if (value === 'some value') {
  •  myProcedure.stop()
    
  • }
  • } */ export function getSubscriptionGenerator<TInput, TValue, TError>( procedure: SubscriptionProcedure<TInput, TValue, TError>, opts: IterableSubscriptionOptions, ) { return function (input: TInput) { return new IterableSubscription<TInput, TValue, TError>(procedure, input, opts) } }

export interface Unsubscribable { unsubscribe: () => void }

export type inferSubscription = T extends { subscribe: ( arg: infer TInput, opts: { onData: (value: infer TValue) => void onError: (error: infer TError) => void }, ) => Unsubscribable } ? IterableSubscription<TInput, TValue, TError> : never

export interface TRPCSubscriptionObserver<TValue, TError> { onStarted: () => void onData: (value: TValue) => void onError: (err: TError) => void onStopped: () => void onComplete: () => void }

export interface SubscriptionProcedure<TInput, TValue, TError> { subscribe: ( input: TInput, opts: TRPCSubscriptionObserver<TValue, TError>, ) => { unsubscribe: () => void } }

export type SubscriptionDoneCallback = (value: TValue) => boolean

export const neverDone = () => false

export interface IterableSubscriptionOptions { isDone: SubscriptionDoneCallback }

export class IterableSubscription<TInput, TValue, TError> { private procedure: SubscriptionProcedure<TInput, TValue, TError> private input: TInput private opts: { isDone: (val: TValue) => boolean }

private state: { started: boolean done: boolean stopped: boolean completed: boolean

error: TError | null
data: TValue | null
next: Promise<TValue> | null

resolve?: ((value: TValue) => void) | null
reject?: ((err: TError) => void) | null
unsubscribe?: (() => void) | null

}

constructor( procedure: SubscriptionProcedure<TInput, TValue, TError>, input: TInput, opts: IterableSubscriptionOptions = { isDone: neverDone, }, ) { this.procedure = procedure this.input = input this.opts = opts

this.state = {
  started: false,
  done: false,
  stopped: false,
  completed: false,
  error: null,
  data: null,
  next: null,
  resolve: null,
  reject: null,
  unsubscribe: null,
}

}

async *start() { this.state.next = new Promise((resolve, reject) => { this.state.resolve = resolve this.state.reject = reject const { unsubscribe } = this.procedure.subscribe(this.input, { onStarted: () => { this.state.started = true }, onData: (value: TValue) => { this.state.data = value const { resolve } = this.state if (this.opts.isDone(value)) { this.state.done = true this.stop() } else { this.state.next = new Promise((resolve, reject) => { this.state.resolve = resolve this.state.reject = reject }) } if (resolve) resolve(value) }, onError: (err: TError) => { this.state.error = err if (this.state.reject) { reject(err) } else { // eslint-disable-next-line @typescript-eslint/no-throw-literal throw err } }, onStopped: () => { this.state.stopped = true }, onComplete: () => { this.state.completed = true }, }) this.state.unsubscribe = unsubscribe })

while (!this.state.stopped && !this.state.completed && !this.state.done) {
  const val = await this.state.next
  yield val
}
this.stop()

}

stop() { if (this.state.unsubscribe) { this.state.unsubscribe() } this.state.unsubscribe = undefined } }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment