Skip to content

Instantly share code, notes, and snippets.

@mold
Last active August 21, 2022 20:46
Show Gist options
  • Save mold/e516738581a0b68423c65f44e1c3bca3 to your computer and use it in GitHub Desktop.
Save mold/e516738581a0b68423c65f44e1c3bca3 to your computer and use it in GitHub Desktop.
An rxjs combineLatest that takes an object of key/observable pairs and emits an object of key/values when any of the inner observables emits (typescript).
import { combineLatest, noop, Observable } from 'rxjs';
import { debounceTime, map, shareReplay, startWith, tap } from 'rxjs/operators';
export interface OperatorDict<X> {
[key: string]: Observable<X> | [Observable<X>, X];
}
/**
* Extracts the type `T` of an `Observable<T>`
*/
export type ExtractObservableType<A> = A extends Observable<infer B> ? B : never;
export interface ICombineLatestOptions {
/**
* Debounce the emitted value from combineLatest (with
* debounceTime(0))
*
* @default true
*/
debounce?: boolean;
/**
* Initialize every observable with null. Does not override any
* startWith value set in the original input object.
*
* @default false
*/
startWithNull?: boolean;
/**
* Logs the state on every new emission
*
* @default false
*/
logState?: boolean;
/**
* Share replays the whole thing.
*
* @default true
*/
shareReplay?: boolean;
}
const nop = <T>() => tap<T>(noop);
/**
* Takes a key/value object of observables or tuples:
*
* ```
* {
* obs1: of(123),
* obs2: [of("value").pipe(delay(1000)), "startWith value"],
* }
* ```
*
* and every time one of the source observables emits, emits an object
* with the latest value from all observables:
*
* ```
* {
* obs1: 123,
* obs2: "startWith value",
* }
* ```
* @param observables
* @param debounce
*/
export const combineLatestToObject = <
TIn extends OperatorDict<any>,
TOut extends { [K in keyof TIn]: ExtractObservableType<TIn[K] extends Array<any> ? TIn[K][0] : TIn[K]> }
>(
observables: TIn,
{
debounce = true,
startWithNull = false,
logState = false,
shareReplay: doShareReplay = true,
} = {} as ICombineLatestOptions,
): Observable<TOut> => {
const keys = Object.keys(observables);
return combineLatest(
keys.map(k => {
const obs = observables[k];
return Array.isArray(obs)
? obs[0].pipe(startWith(obs[1]))
: obs.pipe(startWithNull ? startWith(null) : nop());
}),
).pipe(
debounce ? debounceTime(0) : nop(),
map(b => b.reduce((acc, val, i) => ({ ...acc, [keys[i]]: val }), {})),
logState ? tap(state => console.log({ state })) : nop(),
// keep this at the end
doShareReplay ? shareReplay({ refCount: true, bufferSize: 1 }) : nop(),
);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment