Created
October 2, 2018 05:24
-
-
Save LexVocoder/fa06c491e5f48137ff268594e441328e to your computer and use it in GitHub Desktop.
Pull from observable (via callback)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Pull values from observables (instead of push-subscribe). | |
// A pull is like a one-time subscription. | |
// Typically, the onNext argument to pull invokes pull again before finishing. | |
// This negates the need for a while loop; | |
// besides, such a loop will pull multiple times without waiting for prior pulls to finish processing. | |
// There's a bug in this somewhere. A pull after a buffer overflow should first empty buffer, and then err. | |
// Instead, we get deadlock. Weird. | |
class PullSubscriber { | |
constructor(src, bufferSize) { | |
this.source = src; | |
this.started = false; | |
this.readyList = []; | |
this.bufferSize = bufferSize; | |
this.buffer = []; | |
this.closed = false; | |
this.hasErred = false; | |
this.error = undefined; | |
} | |
pull(onNext, onError, onCompletion) { | |
console.log("entering pull"); | |
const subFuncs = {onNext, onError, onCompletion}; | |
this.readyList.push(subFuncs); | |
if (!this.started) { | |
this.start(); | |
} | |
this.emptyBuffer(); | |
console.log("exiting pull"); | |
return; | |
} | |
start() { | |
this.started = true; | |
this.source.subscribe( | |
x => { | |
if (this.hasErred) return; | |
if (this.readyList.length > 0) { | |
const sub = this.readyList.shift(); | |
sub.onNext(x); | |
} else if (this.buffer.length > this.bufferSize) { | |
const err = new Error("PullSubscriber cannot receive value: buffer overflow"); | |
this.onError(err); | |
throw err; | |
} else { | |
this.buffer.push(x); | |
} | |
}, | |
err => { | |
this.onError(err); | |
}, | |
() => { | |
this.onCompletion(); | |
}); | |
} | |
onCompletion() { | |
this.closed = true; | |
this.emptyBuffer(); // ensures we don't complete folks until buffer is empty | |
} | |
onError(err) { | |
this.hasErred = true; | |
this.error = err; | |
this.emptyBuffer(); // don't signal errors until buffer is empty | |
} | |
emptyBuffer() { | |
while (this.buffer.length > 0 && this.readyList.length > 0) { | |
const x = this.buffer.shift(); | |
const sub = this.readyList.shift(); | |
sub.onNext(x); | |
} | |
console.log(`buffer.length === ${this.buffer.length}`); | |
if (this.buffer.length === 0) { | |
if (this.hasErred) { | |
console.log(`emptyBuffer: publishing error`); | |
this.readyList.forEach(sub => sub.onError(this.error)); | |
} else if (this.closed) { | |
console.log(`emptyBuffer: publishing completion`); | |
this.readyList.forEach(sub => sub.onCompletion()); | |
} | |
} | |
} | |
} | |
console.log("* finite elements..."); | |
let source=Rx.Observable.of([1]); | |
let pulley = new PullSubscriber(source, 0); | |
//output: 0,1,2,3,4,5...... | |
pulley.pull( | |
x => { console.log("first pull got value " + x); /*sub1.ready();*/ }, | |
err => console.log("first pull got error " + err), | |
() => console.log("first pull finished")); | |
pulley.pull( | |
x => { console.log("2nd pull got value " + x); /*sub1.ready();*/ }, | |
err => console.log("2nd pull got error " + err), | |
() => console.log("2nd pull finished")); | |
console.log("* infinite elements..."); | |
source = Rx.Observable.timer(1000, 1000); | |
pulley = new PullSubscriber(source, 0); | |
//output: 0,1,2,3,4,5...... | |
pulley.pull( | |
x => console.log("first pull got value " + x), | |
err => console.log("first pull got error " + err), | |
() => console.log("first pull finished")); | |
pulley.pull( | |
x => console.log("2nd pull got value " + x), | |
err => console.log("2nd pull got error " + err), | |
() => console.log("2nd pull finished")); | |
setTimeout(() => { | |
let ix = 3; | |
let doPull = () => { throw new Error("bootstrapping version of doPull called"); }; | |
doPull = () => | |
console.log("pulling..."); | |
pulley.pull( | |
x => { | |
console.log(`pull ${ix} got value ${x}`); | |
console.log("queueing pull..."); | |
setTimeout(doPull, 100); | |
}, | |
err => { console.log(`pull ${ix} got error ${err}`); }, | |
() => { console.log(`pull ${ix} finished`); }); | |
doPull(); | |
}, | |
5000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment