Created January 9, 2019
class IteratorSubscriber extends Subscriber {
constructor(observer, generator) {
this.generator = generator
step(key, arg) {
// Unsubscribe now.
if (this.subscription) {
this.subscription = null
let result
try {
result = this.iterator![key](arg)
} catch (error) {
const { value, done } = result
if (done) {
// We're going to create an observable from the value and `subscribe` to
// it. It is possible that our subscription's `next` method is called
// before `subscribe` returns the subscription. This happens for
// "synchronous" observables, e.g. arrays. In such cases, the `next`
// method will have already saved a subscription for an observable later
// in the generator's execution, and we don't want to override it with
// our subscription. Thus, we save our subscription to a temporary
// variable and copy it to `this.subscription` only after checking that
// it does not already hold another subscription.
// The observables proposal adds a `start` method to the `Observer` type
// that accepts the `Subscription` before `next`, `complete`, or `error`
// are ever called. That will solve our problem, but no implementation is
// yet available.
// Being able to cancel the subscription before it completes will let us
// stop counting values and lose the `take` operator. It will let us
// assume in our completion callback that no values have been sent.
let count = 0
const subscription = from(value)
value => {
count += 1
if (count > 1) {
// This means `take` failed to cancel our subscription before
// a second value was delivered.
console.error('awaited observable returned too many values')
// Recurse.
this.step('next', value)
error => this.step('throw', error),
() => {
if (count < 1) {
// This stops progress. How should we handle it?
console.error('awaited observable completed with no value')
if (!this.subscription) {
this.subscription = subscription
unsubscribe() {
this.iterator = null
if (this.subscription) {
this.subscription = null
