Skip to content

Instantly share code, notes, and snippets.

@smart--petea
Last active February 10, 2017 08:04
Show Gist options
  • Save smart--petea/fabe081d43f5d070ddacba3abec4c0b9 to your computer and use it in GitHub Desktop.
Save smart--petea/fabe081d43f5d070ddacba3abec4c0b9 to your computer and use it in GitHub Desktop.
Buffer operator based on counting time and items
/// <reference path="typings/globals/es6-shim/index.d.ts" />`
import { Observable } from "rxjs/Observable";
import { Observer } from "rxjs/Observer";
import { Subscriber } from "rxjs/Subscriber";
function timerBufferOperator<T>(this: Observable<T>, bufferSize: number, timeFrame: number): Observable<T> {
return Observable.create((subscriber: Subscriber<Array<T>>) => {
this.subscribe(new TimerBufferOperator(subscriber, bufferSize, timeFrame));
});
}
class CustomTimer<T> {
private rawTimer = null;
constructor(private operator: TimerBufferOperator<T>, private clbk: () => void, private timeFrame: number ) {
this.start();
}
public start() {
this.rawTimer = setTimeout(() => { this.clbk.call(this.operator); }, this.timeFrame);
}
public stop() {
if(this.rawTimer == null) return;
clearTimeout(this.rawTimer);
this.rawTimer = null;
}
public restart() {
this.stop();
this.start();
}
}
class TimerBufferOperator<T> implements Observer<T> {
closed: boolean = false;
private list: Array<T>;
private timer: CustomTimer<T>;
constructor(private subscriber: Subscriber<Array<T>>, private bufferSize: number, timeFrame: number) {
this.list = new Array<T>();
this.timer = new CustomTimer(this, this.drain, timeFrame);
}
public next(value: T): void {
if(this.list.length == 0) this.timer.restart();
this.list.push(value);
if(this.list.length >= this.bufferSize) this.drain();
}
private drain() : void {
if(this.list.length == 0) return;
if(this.subscriber.closed) {
this.closed = true;
this.timer.stop();
return;
}
let list = this.list;
this.list = new Array<T>();
this.subscriber.next(list);
}
public error(err: any): void {
this._close();
this.subscriber.error(err);
}
private _close(): void {
this.closed = true;
this.drain();
this.timer.stop();
}
public complete(): void {
this._close();
this.subscriber.complete();
}
}
Observable.prototype.timerBufferOperator = timerBufferOperator;
declare module "rxjs/Observable" {
interface Observable<T> {
timerBufferOperator: typeof timerBufferOperator;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment