Created
January 9, 2019 20:51
-
-
Save thejohnfreeman/239a7c0ef79005f536138cd50aee23c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class IteratorSubscriber extends Subscriber { | |
constructor(observer, generator) { | |
super(observer) | |
this.generator = generator | |
this.step('next') | |
} | |
step(key, arg) { | |
// Unsubscribe now. | |
if (this.subscription) { | |
this.subscription.unsubscribe() | |
this.subscription = null | |
} | |
let result | |
try { | |
result = this.iterator![key](arg) | |
} catch (error) { | |
this.error(error) | |
return | |
} | |
const { value, done } = result | |
if (done) { | |
this.next(value) | |
this.complete() | |
return | |
} | |
// We're going to create an observable from the value and `subscribe` to | |
// it. It is possible that our subscription's `next` method is called | |
// before `subscribe` returns the subscription. This happens for | |
// "synchronous" observables, e.g. arrays. In such cases, the `next` | |
// method will have already saved a subscription for an observable later | |
// in the generator's execution, and we don't want to override it with | |
// our subscription. Thus, we save our subscription to a temporary | |
// variable and copy it to `this.subscription` only after checking that | |
// it does not already hold another subscription. | |
// | |
// The observables proposal adds a `start` method to the `Observer` type | |
// that accepts the `Subscription` before `next`, `complete`, or `error` | |
// are ever called. That will solve our problem, but no implementation is | |
// yet available. | |
// | |
// Being able to cancel the subscription before it completes will let us | |
// stop counting values and lose the `take` operator. It will let us | |
// assume in our completion callback that no values have been sent. | |
let count = 0 | |
const subscription = from(value) | |
.pipe(take(1)) | |
.subscribe( | |
value => { | |
count += 1 | |
if (count > 1) { | |
// This means `take` failed to cancel our subscription before | |
// a second value was delivered. | |
console.error('awaited observable returned too many values') | |
} | |
// Recurse. | |
this.step('next', value) | |
}, | |
error => this.step('throw', error), | |
() => { | |
if (count < 1) { | |
// This stops progress. How should we handle it? | |
console.error('awaited observable completed with no value') | |
} | |
}, | |
) | |
if (!this.subscription) { | |
this.subscription = subscription | |
} | |
} | |
unsubscribe() { | |
this.iterator = null | |
if (this.subscription) { | |
this.subscription.unsubscribe() | |
this.subscription = null | |
} | |
super.unsubscribe() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment