Skip to content

Instantly share code, notes, and snippets.

@thejohnfreeman
Created January 9, 2019 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thejohnfreeman/239a7c0ef79005f536138cd50aee23c3 to your computer and use it in GitHub Desktop.
Save thejohnfreeman/239a7c0ef79005f536138cd50aee23c3 to your computer and use it in GitHub Desktop.
class IteratorSubscriber extends Subscriber {
constructor(observer, generator) {
super(observer)
this.generator = generator
this.step('next')
}
step(key, arg) {
// Unsubscribe now.
if (this.subscription) {
this.subscription.unsubscribe()
this.subscription = null
}
let result
try {
result = this.iterator![key](arg)
} catch (error) {
this.error(error)
return
}
const { value, done } = result
if (done) {
this.next(value)
this.complete()
return
}
// We're going to create an observable from the value and `subscribe` to
// it. It is possible that our subscription's `next` method is called
// before `subscribe` returns the subscription. This happens for
// "synchronous" observables, e.g. arrays. In such cases, the `next`
// method will have already saved a subscription for an observable later
// in the generator's execution, and we don't want to override it with
// our subscription. Thus, we save our subscription to a temporary
// variable and copy it to `this.subscription` only after checking that
// it does not already hold another subscription.
//
// The observables proposal adds a `start` method to the `Observer` type
// that accepts the `Subscription` before `next`, `complete`, or `error`
// are ever called. That will solve our problem, but no implementation is
// yet available.
//
// Being able to cancel the subscription before it completes will let us
// stop counting values and lose the `take` operator. It will let us
// assume in our completion callback that no values have been sent.
let count = 0
const subscription = from(value)
.pipe(take(1))
.subscribe(
value => {
count += 1
if (count > 1) {
// This means `take` failed to cancel our subscription before
// a second value was delivered.
console.error('awaited observable returned too many values')
}
// Recurse.
this.step('next', value)
},
error => this.step('throw', error),
() => {
if (count < 1) {
// This stops progress. How should we handle it?
console.error('awaited observable completed with no value')
}
},
)
if (!this.subscription) {
this.subscription = subscription
}
}
unsubscribe() {
this.iterator = null
if (this.subscription) {
this.subscription.unsubscribe()
this.subscription = null
}
super.unsubscribe()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment