Skip to content

Instantly share code, notes, and snippets.

@LexVocoder
Created October 2, 2018 05:24
Show Gist options
  • Save LexVocoder/fa06c491e5f48137ff268594e441328e to your computer and use it in GitHub Desktop.
Save LexVocoder/fa06c491e5f48137ff268594e441328e to your computer and use it in GitHub Desktop.
Pull from observable (via callback)
// Pull values from observables (instead of push-subscribe).
// A pull is like a one-time subscription.
// Typically, the onNext argument to pull invokes pull again before finishing.
// This negates the need for a while loop;
// besides, such a loop will pull multiple times without waiting for prior pulls to finish processing.
// There's a bug in this somewhere. A pull after a buffer overflow should first empty buffer, and then err.
// Instead, we get deadlock. Weird.
class PullSubscriber {
constructor(src, bufferSize) {
this.source = src;
this.started = false;
this.readyList = [];
this.bufferSize = bufferSize;
this.buffer = [];
this.closed = false;
this.hasErred = false;
this.error = undefined;
}
pull(onNext, onError, onCompletion) {
console.log("entering pull");
const subFuncs = {onNext, onError, onCompletion};
this.readyList.push(subFuncs);
if (!this.started) {
this.start();
}
this.emptyBuffer();
console.log("exiting pull");
return;
}
start() {
this.started = true;
this.source.subscribe(
x => {
if (this.hasErred) return;
if (this.readyList.length > 0) {
const sub = this.readyList.shift();
sub.onNext(x);
} else if (this.buffer.length > this.bufferSize) {
const err = new Error("PullSubscriber cannot receive value: buffer overflow");
this.onError(err);
throw err;
} else {
this.buffer.push(x);
}
},
err => {
this.onError(err);
},
() => {
this.onCompletion();
});
}
onCompletion() {
this.closed = true;
this.emptyBuffer(); // ensures we don't complete folks until buffer is empty
}
onError(err) {
this.hasErred = true;
this.error = err;
this.emptyBuffer(); // don't signal errors until buffer is empty
}
emptyBuffer() {
while (this.buffer.length > 0 && this.readyList.length > 0) {
const x = this.buffer.shift();
const sub = this.readyList.shift();
sub.onNext(x);
}
console.log(`buffer.length === ${this.buffer.length}`);
if (this.buffer.length === 0) {
if (this.hasErred) {
console.log(`emptyBuffer: publishing error`);
this.readyList.forEach(sub => sub.onError(this.error));
} else if (this.closed) {
console.log(`emptyBuffer: publishing completion`);
this.readyList.forEach(sub => sub.onCompletion());
}
}
}
}
console.log("* finite elements...");
let source=Rx.Observable.of([1]);
let pulley = new PullSubscriber(source, 0);
//output: 0,1,2,3,4,5......
pulley.pull(
x => { console.log("first pull got value " + x); /*sub1.ready();*/ },
err => console.log("first pull got error " + err),
() => console.log("first pull finished"));
pulley.pull(
x => { console.log("2nd pull got value " + x); /*sub1.ready();*/ },
err => console.log("2nd pull got error " + err),
() => console.log("2nd pull finished"));
console.log("* infinite elements...");
source = Rx.Observable.timer(1000, 1000);
pulley = new PullSubscriber(source, 0);
//output: 0,1,2,3,4,5......
pulley.pull(
x => console.log("first pull got value " + x),
err => console.log("first pull got error " + err),
() => console.log("first pull finished"));
pulley.pull(
x => console.log("2nd pull got value " + x),
err => console.log("2nd pull got error " + err),
() => console.log("2nd pull finished"));
setTimeout(() => {
let ix = 3;
let doPull = () => { throw new Error("bootstrapping version of doPull called"); };
doPull = () =>
console.log("pulling...");
pulley.pull(
x => {
console.log(`pull ${ix} got value ${x}`);
console.log("queueing pull...");
setTimeout(doPull, 100);
},
err => { console.log(`pull ${ix} got error ${err}`); },
() => { console.log(`pull ${ix} finished`); });
doPull();
},
5000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment