Created
December 15, 2016 19:17
-
-
Save mtho11/7004d203b1f9c84ec826c0e868ec86c3 to your computer and use it in GitHub Desktop.
Rx.js RateLimitOperator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//noprotect | |
console.clear(); | |
const { Observable, Subject, Subscriber, Scheduler } = Rx; | |
function rateLimit(budget, time, shouldBuffer = false, scheduler = Scheduler.async) { | |
return this.lift(new RateLimitOperator(budget, time, shouldBuffer, scheduler)); | |
} | |
class RateLimitOperator { | |
constructor(budget, time, shouldBuffer, scheduler) { | |
this.budget = budget; | |
this.time = time; | |
this.shouldBuffer = shouldBuffer; | |
this.scheduler = scheduler; | |
} | |
call(subscriber, source) { | |
return source._subscribe( | |
new RateLimitSubscriber( | |
subscriber, this.budget, this.time, this.shouldBuffer, this.scheduler | |
) | |
); | |
} | |
} | |
class RateLimitSubscriber extends Subscriber { | |
static resetCount({ subscriber }) { | |
subscriber.count = 0; | |
subscriber.clearTimeout(); | |
const { buffer } = subscriber; | |
for (let i = 0; i < buffer.length; i++) { | |
const success = subscriber.maybeNext(buffer[i], false); | |
if (success) { | |
buffer.splice(i, 1); | |
i -= 1; | |
} else { | |
break; | |
} | |
} | |
} | |
constructor(destination, budget, time, shouldBuffer, scheduler) { | |
super(destination); | |
this.budget = budget; | |
this.time = time; | |
this.shouldBuffer = shouldBuffer; | |
this.scheduler = scheduler; | |
this.count = 0; | |
this.buffer = []; | |
} | |
clearTimeout() { | |
const { timeout } = this; | |
if (timeout) { | |
timeout.unsubscribe(); | |
this.remove(timeout); | |
this.timeout = null; | |
} | |
} | |
_next(value) { | |
this.maybeNext(value); | |
} | |
maybeNext(value, shouldBuffer = this.shouldBuffer) { | |
if (!this.timeout) { | |
this.timeout = this.scheduler.schedule( | |
RateLimitSubscriber.resetCount, | |
this.time, | |
{ subscriber: this } | |
); | |
this.add(this.timeout); | |
} | |
if (this.count < this.budget) { | |
this.count++; | |
this.destination.next(value); | |
return true; | |
} else if (shouldBuffer) { | |
this.buffer.push(value); | |
} | |
return false; | |
} | |
} | |
Observable.prototype.rateLimit = rateLimit; | |
const request$ = new Subject(); | |
request$ | |
.rateLimit(2, 1000, true) | |
.subscribe(value => console.log('next: ' + value)); | |
request$.next('first'); | |
request$.next('second'); | |
request$.next('third'); | |
request$.next('forth'); | |
request$.next('fifth'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment