Skip to content

Instantly share code, notes, and snippets.

@nhusher
Created May 18, 2018 12:12
Show Gist options
  • Save nhusher/caff8f0f31d1b646ab55eaaf82a8ba9d to your computer and use it in GitHub Desktop.
Save nhusher/caff8f0f31d1b646ab55eaaf82a8ba9d to your computer and use it in GitHub Desktop.
function * drainObservable (observable) {
let buf = []
let waiting = []
let err = null
let done = false
let subscription
try {
subscription = observable.subscribe(val => {
if (waiting.length) waiting.shift().resolve(val)
else buf.push(Promise.resolve(val))
}, error => {
done = true
if (waiting) waiting.shift().reject(error)
else err = error
}, () => {
done = true
})
while (!done) {
if (err) throw err
if (buf.length) yield buf.shift()
else {
const { resolve, reject, promise } = deferred()
waiting.push({ resolve, reject })
yield promise
}
}
} finally {
if (subscription) subscription.unsubscribe()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment