Skip to content

Instantly share code, notes, and snippets.

@vagarenko
Created February 5, 2018 12:22
Show Gist options
  • Save vagarenko/1ba97772990a9db761900fda6bc514dd to your computer and use it in GitHub Desktop.
Save vagarenko/1ba97772990a9db761900fda6bc514dd to your computer and use it in GitHub Desktop.
Implementation of Functional Reactive Programming (FRP) concept of Behavior in Rx.js
import { BehaviorSubject, Observable, IDisposable } from 'rx';
/**
* Behavior represents a value that varies in time.
* Behavior is similar to `BehaviorSubject` because it always has value,
* but it's not `Subject`, that is you can't push values into it whenever you want.
* You can only subscribe it to an Observable once.
*/
export class Behavior<T> implements IDisposable {
/** Create a behavior that never changes. */
static constant<T>(value: T) {
return new Behavior(value, Observable.never<T>());
}
private _subject: BehaviorSubject<T>;
private _sub: IDisposable;
/**
* Create a `Behavior` from the initial value and the `Observable`.
*/
constructor(initialValue: T, stepper: Observable<T>) {
this._subject = new BehaviorSubject(initialValue);
this._sub = stepper.subscribe(this._subject);
}
/** Current value of the behavior. */
getValue(): T { return this._subject.getValue(); }
/**
* Create a new behavior by applying given function to the values of this behavior.
*/
map<R>(fn: (value: T) => R): Behavior<R> {
return new Behavior<R>(fn(this.getValue()), this._subject.map(fn));
}
/**
* Combine two behaviors into one.
*/
combine<T1>(other: Behavior<T1>): Behavior<[T, T1]> {
return new Behavior<[T, T1]>(
[this.getValue(), other.getValue()],
this._subject.combineLatest(other._subject)
);
}
/**
* Apply a time-varying function to a time-varying value.
*/
apply<R>(behaviorFn: Behavior<(t: T) => R>): Behavior<R> {
return this.combine(behaviorFn).map(([t, fn]) => fn(t));
}
/**
* When specified Observable emits a Behavior switch to it.
*/
switchOn(obs: Observable<Behavior<T>>): Behavior<T> {
return new Behavior(
this.getValue(),
obs.startWith(this).map(b => b._subject.asObservable()).switch());
}
/**
* `Observable` that fires when the value of this `Behavior` changes.
*/
changes(): Observable<T> {
return this._subject.asObservable();
}
/**
* Log behavior changes to the console with optional message.
*/
trace(message?: string) {
return this.map(x => {
if (message) {
console.log(message, x);
} else {
console.log(x);
}
return x;
});
}
dispose() {
this._sub.dispose();
}
}
/**
* Observable extensions.
*/
declare module 'rx' {
interface Observable<T> {
/**
* Allow events only when Behavior is `true`.
*/
when(behavior: Behavior<boolean>): Observable<T>;
/**
* Allow events only when Behavior is `false`.
*/
whenNot(behavior: Behavior<boolean>): Observable<T>;
/**
* Create a Behavior from this Observable starting from given value.
*/
createBehavior(initivalValue: T): Behavior<T>;
/**
* Sample Behavior value at every event occurence.
*/
sampleBehavior<T1>(behavior: Behavior<T1>): Observable<[T, T1]>;
/**
* Create a Behavior from this Observable by applying an accumulator function
* to its values.
*/
scanBehavior<R>(
initivalValue: R,
accumulatorFn: (acc: R, val: T) => R
): Behavior<R>;
}
}
/**
* Allow events only when Behavior is `true`.
*/
Observable.prototype.when = function<T>(this: Observable<T>, behavior: Behavior<boolean>) {
return this.filter(() => behavior.getValue());
};
/**
* Allow events only when Behavior is `false`.
*/
Observable.prototype.whenNot = function<T>(this: Observable<T>, behavior: Behavior<boolean>) {
return this.filter(() => !behavior.getValue());
};
/**
* Create a Behavior from this Observable starting from given value.
*/
Observable.prototype.createBehavior = function<T>(this: Observable<T>, initivalValue: T): Behavior<T> {
return new Behavior(initivalValue, this);
};
/**
* Sample Behavior value at every event occurence.
*/
Observable.prototype.sampleBehavior = function<T, T1>(this: Observable<T>, behavior: Behavior<T1>): Observable<[T, T1]> {
return this.map<[T, T1]>(t => [t, behavior.getValue()]);
};
/**
* Create a Behavior from this Observable by applying an accumulator function
* to its values.
*/
Observable.prototype.scanBehavior = function<T, R>(
this: Observable<T>,
initivalValue: R,
accumulatorFn: (acc: R, val: T) => R
): Behavior<R> {
return this.scan(accumulatorFn, initivalValue)
.createBehavior(initivalValue);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment