Skip to content

Instantly share code, notes, and snippets.

@dmorosinotto
Last active August 21, 2023 06:29
Show Gist options
  • Save dmorosinotto/0d3d27423e2e8e417ec8f22cc05c1905 to your computer and use it in GitHub Desktop.
Save dmorosinotto/0d3d27423e2e8e417ec8f22cc05c1905 to your computer and use it in GitHub Desktop.
computedFrom - Useful function to combine Signal, Observable, Promise -> toSignal + optional pipe operators chain //INSPIRED BY @Enea_Jahollari ARTICLE https://dev.to/this-is-angular/a-sweet-spot-between-signals-and-observables-4chb
//ORIGINAL CODE BY CHAU: https://gist.github.com/eneajaho/33a30bcf217c28b89c95517c07b94266
import { isSignal, Signal, untracked } from '@angular/core';
import { toObservable, toSignal } from '@angular/core/rxjs-interop';
import {
combineLatest,
distinctUntilChanged,
from,
isObservable,
ObservableInput,
of,
OperatorFunction,
take,
} from 'rxjs';
export type ObservableSignalInput<T> = ObservableInput<T> | Signal<T>;
/**
* So that we can have `fn([Observable<A>, Signal<B>]): Observable<[A, B]>`
*/
type ObservableSignalInputTuple<T> = {
[K in keyof T]: ObservableSignalInput<T[K]>;
};
export function computedFrom<Input extends readonly unknown[], Output = Input>(
sources: readonly [...ObservableSignalInputTuple<Input>],
operator?: OperatorFunction<Input, Output>
): Signal<Output>;
export function computedFrom<Input extends object, Output = Input>(
sources: ObservableSignalInputTuple<Input>,
operator?: OperatorFunction<Input, Output>
): Signal<Output>;
export function computedFrom(
sources: any,
operator?: OperatorFunction<any, any>
): Signal<any> {
let { normalizedSources, initialValues } = Object.entries(sources).reduce(
(acc, [keyOrIndex, source]) => {
if (isSignal(source)) {
acc.normalizedSources[keyOrIndex] = toObservable(source);
acc.initialValues[keyOrIndex] = untracked(source);
} else if (isObservable(source)) {
acc.normalizedSources[keyOrIndex] = source.pipe(distinctUntilChanged());
source.pipe(take(1)).subscribe((attemptedSyncValue) => {
if (acc.initialValues[keyOrIndex] !== null) {
acc.initialValues[keyOrIndex] = attemptedSyncValue;
}
});
acc.initialValues[keyOrIndex] ??= null;
} else {
acc.normalizedSources[keyOrIndex] = from(source as any).pipe(
distinctUntilChanged()
);
acc.initialValues[keyOrIndex] = null;
}
return acc;
},
{
normalizedSources: Array.isArray(sources) ? [] : {},
initialValues: Array.isArray(sources) ? [] : {},
} as {
normalizedSources: any;
initialValues: any;
}
);
normalizedSources = combineLatest(normalizedSources);
if (operator) {
normalizedSources = normalizedSources.pipe(operator);
operator(of(initialValues))
.pipe(take(1))
.subscribe((newInitialValues) => {
initialValues = newInitialValues;
});
}
return toSignal(normalizedSources, { initialValue: initialValues });
}
//ORIGINAL CODE BY ENEA: https://gist.github.com/eneajaho/dd74aeecb877069129e269f912e6e472
import { Signal, isSignal } from '@angular/core';
import { toSignal, toObservable } from '@angular/core/rxjs-interop';
import {
from,
isObservable,
Observable,
ObservableInput,
OperatorFunction,
} from 'rxjs';
export function computed$<TValue, TReturn = TValue>(
signal: Signal<TValue>,
operator: OperatorFunction<TValue, TReturn>
): Signal<TReturn>;
export function computed$<TValue, TReturn = TValue>(
promise: Promise<TValue>,
initialValue: TValue,
operator?: OperatorFunction<TValue, TReturn>
): Signal<TReturn>;
export function computed$<TValue, TReturn = TValue>(
observable: Observable<TValue>,
initialValue?: TValue,
operator?: OperatorFunction<TValue, TReturn>
): Signal<TReturn>;
export function computed$<TValue, TReturn = TValue>(
source: ObservableInput<TValue> | Signal<TValue>,
initialValueOrOperator?: TValue | OperatorFunction<TValue, TReturn>,
operator?: OperatorFunction<TValue, TReturn>
): Signal<TReturn> {
const [$, op, initialValue] = toPipeableArgs(
source,
initialValueOrOperator,
operator
);
if (!op) {
return toSignal($, { initialValue }) as Signal<TReturn>;
}
return toSignal($.pipe(op), {
initialValue: initialValue as TReturn,
}) as Signal<TReturn>;
}
function toPipeableArgs<TValue, TReturn = TValue>(
source: ObservableInput<TValue> | Signal<TValue>,
initialValueOrOperator?: TValue | OperatorFunction<TValue, TReturn>,
operator?: OperatorFunction<TValue, TReturn>
): [Observable<TValue>, OperatorFunction<TValue, TReturn>?, TValue?] {
if (typeof source === 'function' && isSignal(source)) {
return [
toObservable(source),
initialValueOrOperator as OperatorFunction<TValue, TReturn>,
source() as TValue,
];
}
if (
source instanceof Promise ||
('then' in source && typeof source['then'] === 'function')
) {
if (
initialValueOrOperator === undefined ||
typeof initialValueOrOperator === 'function'
)
throw new Error(`computed$ with Promise expects an initialValue`);
return [from(source), operator, initialValueOrOperator as TValue];
}
if (isObservable(source)) {
return [source, operator, initialValueOrOperator as TValue];
}
return [from(source), operator, initialValueOrOperator as TValue];
}
import { isSignal, Signal, untracked } from '@angular/core';
import { toObservable, toSignal } from '@angular/core/rxjs-interop';
import {
combineLatest,
distinctUntilChanged,
from,
identity,
startWith,
isObservable,
type ObservableInput,
type ObservableInputTuple,
type OperatorFunction,
} from 'rxjs';
export type ObservableSignalInput<T> = ObservableInput<T> | Signal<T>;
/**
* So that we can have `fn([Observable<A>, Signal<B>]): Observable<[A, B]>`
*/
type ObservableSignalInputTuple<T> = {
[K in keyof T]: ObservableSignalInput<T[K]>;
};
export function computedFrom<Input extends readonly unknown[], Output = Input, Default = Output>(
sources: readonly [...ObservableSignalInputTuple<Input>],
operator?: OperatorFunction<Input, Output>
): Signal<Output | Default>;
export function computedFrom<Input extends readonly unknown[], Output = Input, Default= Output>(
sources: readonly [...ObservableSignalInputTuple<Input>],
initialValue?: Default
): Signal<Output | Default>;
export function computedFrom<Input extends readonly unknown[], Output = Input, Default = Output>(
sources: readonly [...ObservableSignalInputTuple<Input>],
operator?: OperatorFunction<Input, Output>,
initialValue?: Default
): Signal<Output | Default>;
export function computedFrom<Input extends object, Output = Input, Default = Output>(
sources: ObservableSignalInputTuple<Input>,
operator?: OperatorFunction<Input, Output>
): Signal<Output | Default>;
export function computedFrom<Input extends object, Output = Input, Default = Output>(
sources: ObservableSignalInputTuple<Input>,
initialValue?: Default
): Signal<Output | Default>;
export function computedFrom<Input extends object, Output = Input, Default = Output>(
sources: ObservableSignalInputTuple<Input>,
operator?: OperatorFunction<Input, Output>,
initialValue?: Default
): Signal<Output | Default>;
export function computedFrom<Input = any, Output = Input, Default = Output>(
...args: any[]
): Signal<Output | Default> {
const { normalizedSources, hasInitValue, operator, initialValue } =
_normalizeArgs<Input, Output, Default>(args);
try {
//IF YOU PASS initialValue FOR Signal<Output> WORKS WITHOUT ANY PROBLEM EVEN IF sources Observable ARE ASYNC (LATE EMIT)
//IF YOU DON'T PASS THE initialValue ENFORCE THAT Observable SYNC EMIT USING THE NATIVE toSignal requiredSync:true OPTION -> SO IF ANYONE FORGET TO USE startWith IT WILL ERROR!
const ret : Signal<Output | Default> = hasInitValue
? toSignal(combineLatest(normalizedSources).pipe(operator), { initialValue: initialValue as any }) //HACK STRANGE TS ERROR Defaultundefined is not assignable to undefined ?!?
: toSignal(combineLatest(normalizedSources).pipe(operator), { requireSync: true });
return ret;
} catch (e: any) {
if (e.message.includes("requireSync") || e.message.includes("NG601") || e.code == 601)
console.warn("Some observable sources doesn't emit sync value, please pass initialValue to computedFrom, or use startWith operator to ensure initial sync value for all your sources!")
else console.error(`computedFrom problem converting toSignal - Details:\n${e}`);
throw e
}
}
function _normalizeArgs<Input, Output, Default>(
args: any[]
): {
normalizedSources: ObservableInputTuple<Input>;
operator: OperatorFunction<Input, Output>;
hasInitValue: boolean;
initialValue: Default | undefined;
} {
if (!args || !args.length || typeof args[0] !== 'object')
throw new Error('computedFrom need sources');
const hasOperator = typeof args[1] == 'function';
if (args.length == 3 && !hasOperator)
throw new Error('computedFrom need pipebale operator as second arg');
const hasInitValue = args.length == 3 || args.length == 2 && !hasOperator;
if (!hasOperator) args.splice(1, 0, identity);
const [sources, operator, initialValue] = args;
const normalizedSources = Object.entries(sources).reduce(
(acc, [keyOrIndex, source]) => {
if (isSignal(source)) {
acc[keyOrIndex] = toObservable(source).pipe(startWith(untracked(source)));
} else if (isObservable(source)) {
acc[keyOrIndex] = source.pipe(distinctUntilChanged());
} else {
acc[keyOrIndex] = from(source as any).pipe(distinctUntilChanged());
}
return acc;
},
(Array.isArray(sources) ? [] : {}) as any
);
return { normalizedSources, hasInitValue, operator, initialValue };
}
//ORIGINAL CODE BY ENEA: https://gist.github.com/eneajaho/53c0eca983c1800c4df9a5517bdb07a3
import { isSignal, signal, Signal } from '@angular/core';
import { toObservable, toSignal } from '@angular/core/rxjs-interop';
import {
combineLatest,
distinctUntilChanged,
from,
interval,
isObservable,
Observable,
ObservableInput,
of,
OperatorFunction,
} from 'rxjs';
export type ObservableSignalInput<T> = ObservableInput<T> | Signal<T>;
/**
* So that we can have `fn([Observable<A>, Signal<B>]): Observable<[A, B]>`
*/
type ObservableSignalInputTuple<T> = {
[K in keyof T]: ObservableSignalInput<T[K]>;
};
export function computedFrom<R, A extends readonly unknown[]>(
sources: readonly [...ObservableSignalInputTuple<A>],
operator: OperatorFunction<A, R>
): Signal<R | undefined>;
export function computedFrom<R, A extends readonly unknown[]>(
sources: readonly [...ObservableSignalInputTuple<A>],
initialValue: R,
operator?: OperatorFunction<A, R>
): Signal<R>;
export function computedFrom<R, A extends readonly unknown[]>(
sources: readonly [...ObservableSignalInputTuple<A>],
initialValue?: R,
operator?: OperatorFunction<A, R>
): Signal<R | undefined>;
export function computedFrom<R, A extends readonly unknown[]>(
sources: readonly [...ObservableSignalInputTuple<A>],
initialValueOrOperator?: R | OperatorFunction<A, R>,
operator?: OperatorFunction<A, R>
): Signal<R> {
const obsSources = sources.map((x) => {
const obs$ = sourceToObservable(x);
return obs$.pipe(distinctUntilChanged());
});
const initialValue =
typeof initialValueOrOperator === 'function'
? undefined
: initialValueOrOperator;
const pipeOperators =
typeof initialValueOrOperator === 'function'
? initialValueOrOperator
: operator;
const result$ = combineLatest(obsSources).pipe(
pipeOperators as unknown as OperatorFunction<any, R>
);
return toSignal(result$, { initialValue }) as Signal<R>;
}
function sourceToObservable<T>(
source: ObservableSignalInput<T>
): Observable<T> {
if (typeof source === 'function' && isSignal(source))
return toObservable(source);
if (isObservable(source)) return source;
return from(source);
}
import 'zone.js/dist/zone';
import { Component, computed, effect, inject, signal } from '@angular/core';
import { CommonModule } from '@angular/common';
import { bootstrapApplication } from '@angular/platform-browser';
import { ActivatedRoute, provideRouter, RouterModule } from '@angular/router';
import { GalleryService } from './gallery.service';
import { GlobalFiltersService } from './global-filters.service';
import {
filter,
map,
pipe,
startWith,
switchMap,
of,
tap,
timer,
auditTime,
debounceTime,
withLatestFrom,
} from 'rxjs';
import { computed$ } from './computed$';
// import { computedFrom } from './Enea_computedFrom'; //ONLY ARRAY SYNTAX
// import { computedFrom } from './Chau_computedFrom'; //ARRAY + OBJECT SYNTAX THAT REQUIRE Observable SYNC EMIT + GET INITIAL VALUE FOR SOURCES
import { computedFrom } from './computedFrom'; //THIS IS MINE SOLUTION THAT ENFORCE Sync Observable WITH NATIVE toSignal+requiredSync OR CAN PASS initialValue TO NOT ERROR IF Observale EMIT LATER (real async ;-)
@Component({
selector: 'my-app',
standalone: true,
imports: [RouterModule],
template: `
<nav>
<a routerLink="/1">id=1</a> |
<a routerLink="/2">id=2</a> |
<button (click)="filterService.changeType()">changeType</button>
</nav>
<hr/>
<router-outlet></router-outlet>
`,
})
export class App {
filterService = inject(GlobalFiltersService);
}
//INSPIRED BY @Enea_Jahollari ARTICLE https://dev.to/this-is-angular/a-sweet-spot-between-signals-and-observables-4chb
@Component({
selector: 'my-cmp',
standalone: true,
imports: [CommonModule],
template: `
<button (click)="showStars.set(!showStars())">
Toggle show stars to {{ !showStars() }}
</button>
<pre>
{{ data() | json }}
</pre>
<div>
Favorites count: {{ favoritesCount() }}
<pre>combined={{ combined() | json }}</pre>
</div>
`,
})
export class Cmp {
private route = inject(ActivatedRoute);
private galleryService = inject(GalleryService);
private filterService = inject(GlobalFiltersService);
constructor() {
effect(() => console.log('Data Value:', this.data()));
}
galleryId$ = this.route.paramMap.pipe(map((p) => p.get('id')!));
showStars = signal(false);
data =
//USE computed$ TO APPLY OPERATORS TO Signal | Observable | Promise
// computed$(
// this.filterService.filters,
// pipe(
// tap((filter) => console.info('Now filter', filter)),
// debounceTime(500),
// tap((debounced) => console.warn('Debounced', debounced)),
// withLatestFrom(this.galleryId$),
// switchMap(([filterDebounced, id]) =>
// this.galleryService.getGalleryItems(id, filterDebounced)
// )
// )
// );
//USE computedFrom TO COMBINE Signal AND Observable + APPLY OPERATORS
computedFrom(
[
this.galleryId$.pipe(filter((x) => !!x)),
this.filterService.filters,
this.showStars,
],
pipe(
switchMap(([id, filters, showStars]) =>
this.galleryService.getGalleryItems(id!, filters, showStars)
),
startWith([]) // this will be the initial value of data
)
//,[] //YOU CAN REMOVE startWith FROM ABOVE AND USE THIS initialValue
);
favoritesCount = computed(() => getFavoritesCount(this.data()));
// Signal with default value
first = signal(1);
// Observable that emits first value synchronously
second$ = of('1');
// Observable that emits first value asynchronously
third$ = timer(5000, 1000); //.pipe(startWith(-42));
//TEST combined
combined =
//ARRAY SYNTAX BY ENEA -> infer Signal<number>
computedFrom(
[this.first, this.second$, this.third$],
42
// pipe(
// map(([f, s, t]) => f + +s + t),
// auditTime(500) //TRY WITH 1000 OR 1500 ;-)
// startWith(123) //ERROR IF COMMENTED AND DON'T PASSA initialValue
// )
);
//OBJECT SYNTAX BY CHAU -> infer Signal<string>
// computedFrom(
// {
// first: this.first,
// second: this.second$,
// third: this.third$, //.pipe(startWith(-42)),
// },
// map(({ first, second, third }) => first + second + third),
// 'This will be the initialValue' //IF YOU REMOVE THIS WILL ERROR
// );
}
function getFavoritesCount(data?: any[]): number {
return Array.isArray(data) ? data.filter((x) => x.favorite!!).length : -1;
}
bootstrapApplication(App, {
providers: [
provideRouter([
{ path: '', redirectTo: '/42', pathMatch: 'full' },
{ path: ':id', component: Cmp },
]),
],
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment