Skip to content

Instantly share code, notes, and snippets.

@EvAlex
Created May 12, 2017 09:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EvAlex/1939696d82a971a8d56429d93348dde2 to your computer and use it in GitHub Desktop.
Save EvAlex/1939696d82a971a8d56429d93348dde2 to your computer and use it in GitHub Desktop.
StatefulObserver
import { Observer, Observable, ReplaySubject, Subscriber } from 'rxjs/Rx';
interface IObserver<T> {
next(value?: T): void;
error(err: any): void;
complete(): void;
}
/**
* Provides Observer interface implementation that is handy for use in data-binding
*/
export class StatefulObserver<TData, TParams> extends Subscriber<TData> implements Observer<TData> {
private _params: TParams;
private _value: TData = null;
private _errorObject;
private _receivedValue = false;
private _receivedError = false;
private _isComplete = false;
private _connected: { observer: IObserver<any>, project: (value: any) => any }[] = [];
get params(): TParams {
return this._params;
}
get value(): TData {
return this._value;
}
public get receivedValue() {
return this._receivedValue;
}
get waitingForValue() {
return !this._receivedValue && !this._receivedError && !this._isComplete;
}
get willReceiveValue() {
return !this._receivedError && !this._isComplete;
}
get receivedError() {
return this._receivedError;
}
get errorObject() {
return this._errorObject;
}
get isComplete() {
return this._isComplete;
}
constructor(params?: TParams) {
super();
this._params = params;
}
next(data: TData) {
this._value = data;
this._receivedValue = true;
this._connected.forEach(e => e.observer.next(e.project(data)));
}
error(error?) {
this._receivedError = true;
this._errorObject = error;
this._connected.forEach(e => e.observer.error(error));
}
complete() {
this._isComplete = true;
this._connected.forEach(e => e.observer.complete());
}
map<T>(project: (value: TData) => T): StatefulObserver<T, TParams> {
const res = new StatefulObserver<T, TParams>(this._params);
this._connected.push({ observer: res, project: project });
return res;
}
asObservable<T>(project: (value: TData) => T = v => <any>v): Observable<T> {
const res = new ReplaySubject<T>();
if (this._receivedValue) {
res.next(project(this._value));
}
if (this._receivedError) {
res.error(this._errorObject);
}
if (this._isComplete) {
res.complete();
}
this._connected.push({ observer: res, project: project });
return res.asObservable();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment