Created
July 21, 2023 16:16
-
-
Save dmorosinotto/f43bceaf57819e98bd8cab46a9c3d998 to your computer and use it in GitHub Desktop.
suspensify() - RxJS operator to convert maybeFailingObs$<T> => Obs$ that never fails and emit "state" { pending, finalized, hasValue + value: T, hasError + error:any }
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//ORIGINAL CODE https://github.com/jscutlery/devkit/tree/main/packages/operators | |
import { | |
MonoTypeOperatorFunction, | |
Observable, | |
ObservableNotification, | |
OperatorFunction, | |
ReplaySubject, | |
} from 'rxjs'; | |
import { debounce, map, materialize, scan, startWith } from 'rxjs/operators'; | |
export interface SuspenseLax<T> { | |
finalized: boolean; | |
hasError: boolean; | |
hasValue: boolean; | |
pending: boolean; | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any | |
error: undefined | any; | |
value: undefined | T; | |
} | |
export interface SuspensePending { | |
finalized: false; | |
hasError: false; | |
hasValue: false; | |
pending: true; | |
} | |
export interface SuspenseWithValue<T> { | |
finalized: boolean; | |
hasError: false; | |
hasValue: true; | |
pending: false; | |
value: T; | |
} | |
export interface SuspenseWithError { | |
finalized: true; | |
hasError: true; | |
hasValue: false; | |
pending: false; | |
error: unknown; | |
} | |
export interface SuspenseEmpty { | |
finalized: true; | |
hasError: false; | |
hasValue: false; | |
pending: false; | |
} | |
export type Suspense<T> = | |
| SuspensePending | |
| SuspenseWithValue<T> | |
| SuspenseWithError | |
| SuspenseEmpty; | |
export interface SuspensifyOptions { | |
strict?: boolean; | |
} | |
/** | |
* @description creates a derivated state from the source observable. | |
* | |
* @example source$.pipe(suspensify()) | |
* | |
* @returns Observable<SuspenseLax<T> | SuspenseStrict<T>> | |
*/ | |
export function suspensify<T>(options: { | |
strict: false; | |
}): OperatorFunction<T, SuspenseLax<T>>; | |
export function suspensify<T>(options?: { | |
strict: true; | |
}): OperatorFunction<T, Suspense<T>>; | |
export function suspensify<T>({ | |
strict = true, | |
}: SuspensifyOptions = {}): OperatorFunction<T, SuspenseLax<T> | Suspense<T>> { | |
return (source$: Observable<T>): Observable<Suspense<T>> => { | |
const strictSuspense$ = source$.pipe( | |
_suspensify(), | |
_coalesceFirstEmittedValue() | |
); | |
return strict | |
? strictSuspense$ | |
: (strictSuspense$.pipe( | |
map((strictSuspense) => ({ | |
error: undefined, | |
value: undefined, | |
...strictSuspense, | |
})) | |
// eslint-disable-next-line @typescript-eslint/no-explicit-any | |
) as any); | |
}; | |
} | |
/** | |
* This is the initial state of the Suspense. | |
* It is exposed for convenience and edge cases where the initial state must be set initially. | |
* e.g. when using {@link suspensify} on inner observables. | |
* @example toSignal(source$.pipe(switchMap(() => fetchData().pipe(suspensify()))), {initialValue: pending}); | |
*/ | |
export const pending: SuspensePending = { | |
finalized: false, | |
hasError: false, | |
hasValue: false, | |
pending: true, | |
}; | |
function _coalesceFirstEmittedValue<T>(): MonoTypeOperatorFunction<T> { | |
return (source$: Observable<T>): Observable<T> => { | |
return new Observable<T>((observer) => { | |
const isReadySubject = new ReplaySubject<unknown>(1); | |
const subscription = source$ | |
.pipe( | |
/* Wait for all synchronous processing to be done. */ | |
debounce(() => isReadySubject) | |
) | |
.subscribe(observer); | |
/* Sync emitted values have been processed now. | |
* Mark source as ready and emit last computed state. */ | |
isReadySubject.next(undefined); | |
return () => subscription.unsubscribe(); | |
}); | |
}; | |
} | |
/* Use values as types for better type checking. */ | |
const TRUE = true as const; | |
const FALSE = false as const; | |
function _suspensify<T>(): OperatorFunction<T, Suspense<T>> { | |
return (source$: Observable<T>): Observable<Suspense<T>> => { | |
return source$.pipe( | |
materialize(), | |
scan<ObservableNotification<T>, Suspense<T>>((state, notification) => { | |
switch (notification.kind) { | |
/* Value. */ | |
case 'N': | |
return { | |
finalized: FALSE, | |
hasError: FALSE, | |
hasValue: TRUE, | |
value: notification.value, | |
pending: FALSE, | |
}; | |
/* Error. */ | |
case 'E': | |
return { | |
finalized: TRUE, | |
hasError: TRUE, | |
hasValue: FALSE, | |
pending: FALSE, | |
error: notification.error, | |
}; | |
/* Complete. */ | |
case 'C': | |
return { | |
...state, | |
finalized: TRUE, | |
pending: FALSE, | |
}; | |
} | |
}, pending), | |
startWith(pending) | |
); | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//USEFUL HELPER TO SAFE-CONVERT obs$<T> => Signal<Suspend<T>> THAT HELP TO MANAGE ALL POSSIBLE STATE: pending, final, data, err | |
//READ MORE https://dev.to/this-is-angular/managing-rxjs-traffic-with-signals-and-suspensify-pp FOR DETAILS/EXPLAIN USE CASES | |
import { Observable } from "rxjs"; | |
import { suspensify } from "./suspensify.ts"; | |
export function toSuspenseSignal<T>(source$: Observable<T>) { | |
return toSignal(source$.pipe(suspensify({ strict: false })), { | |
requireSync: true, | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment