class IteratorSubscriber extends Subscriber { | |
constructor(observer, generator) { | |
super(observer) | |
this.generator = generator | |
this.step('next') | |
} | |
step(key, arg) { | |
// Unsubscribe now. | |
if (this.subscription) { | |
this.subscription.unsubscribe() | |
this.subscription = null | |
} | |
let result | |
try { | |
result = this.iterator | |
} catch (error) { | |
this.error(error) | |
return | |
} | |
const { value, done } = result | |
if (done) { | |
this.next(value) | |
this.complete() | |
return | |
} | |
// We're going to create an observable from the value and `subscribe` to | |
// it. It is possible that our subscription's `next` method is called | |
// before `subscribe` returns the subscription. This happens for | |
// "synchronous" observables, e.g. arrays. In such cases, the `next` | |
// method will have already saved a subscription for an observable later | |
// in the generator's execution, and we don't want to override it with | |
// our subscription. Thus, we save our subscription to a temporary | |
// variable and copy it to `this.subscription` only after checking that | |
// it does not already hold another subscription. | |
// | |
// The observables proposal adds a `start` method to the `Observer` type | |
// that accepts the `Subscription` before `next`, `complete`, or `error` | |
// are ever called. That will solve our problem, but no implementation is | |
// yet available. | |
// | |
// Being able to cancel the subscription before it completes will let us | |
// stop counting values and lose the `take` operator. It will let us | |
// assume in our completion callback that no values have been sent. | |
let count = 0 | |
const subscription = from(value) | |
.pipe(take(1)) | |
.subscribe( | |
value => { | |
count += 1 | |
if (count > 1) { | |
// This means `take` failed to cancel our subscription before | |
// a second value was delivered. | |
console.error('awaited observable returned too many values') | |
} | |
// Recurse. | |
this.step('next', value) | |
}, | |
error => this.step('throw', error), | |
() => { | |
if (count < 1) { | |
// This stops progress. How should we handle it? | |
console.error('awaited observable completed with no value') | |
} | |
}, | |
) | |
if (!this.subscription) { | |
this.subscription = subscription | |
} | |
} | |
unsubscribe() { | |
this.iterator = null | |
if (this.subscription) { | |
this.subscription.unsubscribe() | |
this.subscription = null | |
} | |
super.unsubscribe() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment