Skip to content

Instantly share code, notes, and snippets.

@mtho11
Created December 15, 2016 19:17
Show Gist options
  • Save mtho11/7004d203b1f9c84ec826c0e868ec86c3 to your computer and use it in GitHub Desktop.
Save mtho11/7004d203b1f9c84ec826c0e868ec86c3 to your computer and use it in GitHub Desktop.
Rx.js RateLimitOperator
//noprotect
console.clear();
const { Observable, Subject, Subscriber, Scheduler } = Rx;
function rateLimit(budget, time, shouldBuffer = false, scheduler = Scheduler.async) {
return this.lift(new RateLimitOperator(budget, time, shouldBuffer, scheduler));
}
class RateLimitOperator {
constructor(budget, time, shouldBuffer, scheduler) {
this.budget = budget;
this.time = time;
this.shouldBuffer = shouldBuffer;
this.scheduler = scheduler;
}
call(subscriber, source) {
return source._subscribe(
new RateLimitSubscriber(
subscriber, this.budget, this.time, this.shouldBuffer, this.scheduler
)
);
}
}
class RateLimitSubscriber extends Subscriber {
static resetCount({ subscriber }) {
subscriber.count = 0;
subscriber.clearTimeout();
const { buffer } = subscriber;
for (let i = 0; i < buffer.length; i++) {
const success = subscriber.maybeNext(buffer[i], false);
if (success) {
buffer.splice(i, 1);
i -= 1;
} else {
break;
}
}
}
constructor(destination, budget, time, shouldBuffer, scheduler) {
super(destination);
this.budget = budget;
this.time = time;
this.shouldBuffer = shouldBuffer;
this.scheduler = scheduler;
this.count = 0;
this.buffer = [];
}
clearTimeout() {
const { timeout } = this;
if (timeout) {
timeout.unsubscribe();
this.remove(timeout);
this.timeout = null;
}
}
_next(value) {
this.maybeNext(value);
}
maybeNext(value, shouldBuffer = this.shouldBuffer) {
if (!this.timeout) {
this.timeout = this.scheduler.schedule(
RateLimitSubscriber.resetCount,
this.time,
{ subscriber: this }
);
this.add(this.timeout);
}
if (this.count < this.budget) {
this.count++;
this.destination.next(value);
return true;
} else if (shouldBuffer) {
this.buffer.push(value);
}
return false;
}
}
Observable.prototype.rateLimit = rateLimit;
const request$ = new Subject();
request$
.rateLimit(2, 1000, true)
.subscribe(value => console.log('next: ' + value));
request$.next('first');
request$.next('second');
request$.next('third');
request$.next('forth');
request$.next('fifth');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment