Skip to content

Instantly share code, notes, and snippets.

@imhoffd
Created March 27, 2022 21:11
Show Gist options
  • Save imhoffd/379bce434c4bf5c562e62be92e50f0c9 to your computer and use it in GitHub Desktop.
Save imhoffd/379bce434c4bf5c562e62be92e50f0c9 to your computer and use it in GitHub Desktop.
AsyncIterableSubject
import AsyncIterableSubject from '../AsyncIterableSubject'
let subject: AsyncIterableSubject<number>
const iterateAndReturnFirst = async <T>(
subject: AsyncIterableSubject<T>,
): Promise<IteratorResult<T>> => {
const iterator = subject[Symbol.asyncIterator]()
return iterator.next()
}
beforeEach(() => {
subject = new AsyncIterableSubject()
})
it('should not iterate after completing', async () => {
setImmediate(() => {
subject.complete()
})
const result = await iterateAndReturnFirst(subject)
expect(result.done).toBe(true)
})
it('should not iterate if completed', async () => {
subject.complete()
const result = await iterateAndReturnFirst(subject)
expect(result.done).toBe(true)
})
it('should not iterate if complete() is called multiple times', async () => {
for (const _ of [1, 2, 3]) {
subject.complete()
}
const result = await iterateAndReturnFirst(subject)
expect(result.done).toBe(true)
})
it('should throw an error if sending value after complete', async () => {
subject.complete()
expect(() => subject.next(1)).toThrowError(/after completion/)
})
it('should throw an error if sending value before iterating', async () => {
expect(() => subject.next(1)).toThrowError(/before iterating/)
})
it('should iterate once with a value before completing', async () => {
expect.hasAssertions()
setImmediate(() => {
subject.next(1)
subject.complete()
})
for await (const value of subject) {
expect(value).toBe(1)
}
})
it('should iterate once with a value', async () => {
expect.hasAssertions()
setImmediate(() => {
subject.next(1)
})
for await (const value of subject) {
expect(value).toBe(1)
break
}
})
it('should allow values sent synchronously', async () => {
expect.hasAssertions()
setImmediate(() => {
subject.next(1)
subject.next(2)
subject.next(3)
})
let expected = 1
for await (const value of subject) {
expect(value).toBe(expected)
if (expected === 3) {
break
}
expected++
}
})
import Deferred from './Deferred'
export default class AsyncIterableSubject<T> implements AsyncIterable<T> {
private _signal: Deferred<void> | null = new Deferred()
private _queue: T[] = []
next(value: T) {
if (!this._signal) {
throw new Error('Cannot call next() after completion')
}
if (!this._signal.awaited) {
throw new Error('Cannot call next() before iterating')
}
this._queue.push(value)
this._signal.resolve()
}
complete() {
if (this._signal?.awaited) {
this._signal.reject()
}
this._signal = null
}
[Symbol.asyncIterator](): AsyncIterator<T, null> {
return {
next: async () => {
try {
if (!this._signal) {
return { done: true, value: null }
}
// delay next iteration until we get a value
await this._signal.promise
const value = this._queue.shift() as T
//
if (this._signal && this._queue.length === 0) {
this._signal = new Deferred()
}
return { done: false, value }
} catch {
return { done: true, value: null }
}
},
}
}
}
export type Resolve<T> = (value: T | PromiseLike<T>) => void
export type Reject = (reason?: any) => void
export default class Deferred<T> {
awaited = false
fulfilled = false
rejected = false
resolve!: Resolve<T>
reject!: Reject
private _promise: Promise<T> = new Promise(
(resolve: Resolve<T>, reject: Reject) => {
this.resolve = value => {
this.fulfilled = true
resolve(value)
}
this.reject = reason => {
this.rejected = true
reject(reason)
}
},
)
get promise() {
this.awaited = true
return this._promise
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment