Skip to content

Instantly share code, notes, and snippets.

@bodil
Created January 16, 2014 22:36
Show Gist options
  • Save bodil/8464867 to your computer and use it in GitHub Desktop.
Save bodil/8464867 to your computer and use it in GitHub Desktop.
Don't.
/**
* Magic version of Rx.Observable.sync() that allows for adding more
* Observables to the input feed while processing.
*
* Needs a more descriptive name maybe.
*/
function wat<T,U>(initialState: T, initialFeeds: Rx.IObservable<U>[], iterator: (state: T, e: U) => StateChange<T,U>): any {
var sub = new Rx.BehaviorSubject(initialState);
var state = initialState;
function subscribe(stream: Rx.IObservable<U>): void {
stream.subscribe(tick, (error) => sub.onError(error));
}
function tick(e: U): void {
var stateChange = iterator(state, e);
sub.onNext(stateChange.state);
state = stateChange.state;
if (stateChange.newStreams) {
stateChange.newStreams.forEach(subscribe);
}
}
initialFeeds.forEach(subscribe);
return sub;
}
interface StateChange<T, U> {
state: T;
newStreams?: Rx.IObservable<U>[];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment