Skip to content

Instantly share code, notes, and snippets.

@LexVocoder
Created October 2, 2018 04:22
Show Gist options
  • Save LexVocoder/77d0f30d4909e60f936b2ddbed7f1004 to your computer and use it in GitHub Desktop.
Save LexVocoder/77d0f30d4909e60f936b2ddbed7f1004 to your computer and use it in GitHub Desktop.
Round Robin Observable
// I originally made this, trying to solve the problem where I had to use Iterator instead of Observable
// when feeding data to concurrenct workers.
// I really think observable is the wrong thing here.
// Typically, Observables push the values to observers.
// As new values show up, it spawns however many concurrenct invocations of Observer.next it needs.
// Workers typically want to pull each value.
// So, Iterator makes sense.
// Hmm, but ... what if we wanted to pull values from an Observable?
// PullSubscriber coming up...
class RoundRobinObservable {
constructor(src, bufferSize) {
this.source = src;
this.started = false;
this.subscribers = [];
this.bufferSize = bufferSize;
this.buffer = [];
this.closed = false;
this.hasErred = false;
this.error = undefined;
}
subscribe(onNext, onError, onCompleted) {
const subFuncs = {onNext, onError, onCompleted};
this.subscribers.push(subFuncs);
if (!this.started) {
this.started = true;
this.source.subscribe(
x => {
if (this.subscribers.length > 0) {
const sub = this.subscribers.shift();
sub.onNext(x);
} else if (this.buffer.length > this.bufferSize) {
this.onError(new Error("RoundRobinEmitter cannot receive value: buffer overflow"));
throw new Error("RoundRobinEmitter cannot receive value: buffer overflow");
} else {
this.buffer.push(x);
}
},
err => {
this.onError(err);
},
() => {
this.closed = true;
});
}
return {
add: () => { throw new 'not implemented'; },
remove: () => { throw new 'not implemented'; },
unsubscribe: () => { throw new 'not implemented'; },
ready: () => {
this.setReady(subFuncs);
}
}
}
onError(err) {
this.hasErred = true;
this.error = err;
this.broadcastError();
}
emptyBuffer() {
while (this.buffer.length > 0 && this.subscribers.length > 0) {
const x = this.buffer.shift();
const sub = this.subscribers.shift();
sub.onNext(x);
}
if (this.closed && this.buffer.length === 0) {
this.closeAll();
}
}
broadcastError() {
// do we broacast one error to each? That seems excessive...
// Do we do this only for the ready subscribers (as shown), or for all of them?
this.subscribers.forEach(sub => sub.onError(this.error));
}
closeAll() {
this.subscribers.forEach(sub => sub.onCompleted());
}
setReady(sub) {
this.subscribers.push(sub);
if (this.hasErred) {
this.broadcastError();
} else {
this.emptyBuffer();
}
}
}
const source = Rx.Observable.timer(1000, 1000);
const robin = new RoundRobinObservable(source, 1);
//output: 0,1,2,3,4,5......
const sub1 = robin.subscribe(
x => { console.log("sub1 got value " + x); /*sub1.ready();*/ },
err => console.log("sub1 got error " + err),
() => console.log("sub1 finished"));
const sub2 = robin.subscribe(
x => { console.log("sub2 got value " + x); /*sub2.ready();*/ },
err => console.log("sub2 got error " + err),
() => console.log("sub2 finished"));
setTimeout(() => sub2.ready(), 8000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment