Last active
December 17, 2023 22:28
-
-
Save tuanchauict/2c6f95c89152f98d8b7436ac81b65bb7 to your computer and use it in GitHub Desktop.
Mimic Flow or LiveData of Android
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {LifecycleObserver, LifecycleOwner} from "./lifecycleowner"; | |
interface Observer<T> { | |
onChange(value: T): void; | |
} | |
class SimpleObserver<T> implements Observer<T> { | |
constructor(private callback: (value: T) => void) { | |
} | |
onChange(value: T): void { | |
if (value !== undefined) { | |
this.callback(value); | |
} | |
} | |
} | |
class ThrottleObserver<T> implements Observer<T> { | |
private timeoutId: number | undefined; | |
private currentValue: T | undefined; | |
constructor(private observer: Observer<T>, private timeout: number) { | |
if (timeout < 0) { | |
throw new Error("Timeout must be >= 0"); | |
} | |
} | |
onChange(value: T) { | |
this.currentValue = value; | |
if (this.timeoutId !== undefined) { | |
return; | |
} | |
if (this.timeout == 0) { | |
// @ts-ignore | |
this.timeoutId = requestAnimationFrame(this.timeoutTick.bind(this)); | |
} else { | |
// @ts-ignore | |
this.timeoutId = setTimeout(this.timeoutTick.bind(this), this.timeout); | |
} | |
} | |
private timeoutTick() { | |
let newValue = this.currentValue; | |
if (newValue === undefined) { | |
return; | |
} | |
this.observer.onChange(newValue); | |
this.timeoutId = undefined; | |
} | |
} | |
export class Flow<T> { | |
private valueInternal: T; | |
private observers: Observer<T>[] = []; | |
private internalObservers: Map<Flow<unknown>, Observer<T>> = new Map(); | |
private isImmutable = false; | |
private parent: Flow<unknown> | undefined; | |
private transform: (a: unknown) => T; | |
/** | |
* A flag that indicates whether the value of this flow should be updated when the parent flow's value changes | |
* regardless of whether this flow has subscribers. | |
* Turning this flag on will cause the value of this flow to be updated even if there are no subscribers. | |
* Only use this flag when reading the current value of the flow is required rather than observing the flow. | |
*/ | |
private isValueUpdatedReactivelyRequired = false; | |
private static immutable<T0, T>(parent: Flow<T0>, transform: (value: T0) => T): Flow<T> { | |
let flow = new Flow<T>(); | |
flow.parent = parent; | |
flow.transform = transform; | |
flow.isImmutable = true; | |
return flow; | |
} | |
constructor(value: T = undefined) { | |
if (value !== undefined) { | |
this.valueInternal = value; | |
} | |
} | |
set value(value: T) { | |
if (this.isImmutable) { | |
throw new Error("Flow is immutable"); | |
} | |
this.setValueInternal(value); | |
} | |
get value(): T | undefined { | |
return this.valueInternal === undefined ? this.transform(this.parent.value) : this.valueInternal; | |
} | |
private setValueInternal(value: T) { | |
this.valueInternal = value; | |
for (const observer of this.observers) { | |
this.delegateValueToObserver(observer, value); | |
} | |
for (let [flow, observer] of this.internalObservers) { | |
if (flow.hasSubscribers()) { | |
this.delegateValueToObserver(observer, value); | |
} | |
} | |
} | |
/** | |
* Makes the value of this flow updated when the parent flow's value changes regardless of whether this flow has | |
* subscribers. | |
* This method is useful when reading the current value of the flow is required rather than observing the flow. | |
*/ | |
makeValueUpdateReactively() { | |
this.isValueUpdatedReactivelyRequired = true; | |
// Update the value when this flag is turned on. | |
// This does nothing if the value is already up-to-date, so it's safe to call this method multiple times. | |
// Otherwise, this method will update the value to the latest value of the parent flow. | |
this.valueInternal = this.value; | |
} | |
map<R>(transform: (value: T) => R): Flow<R> { | |
let flow = Flow.immutable(this, transform); | |
this.addInternalObserver(flow, | |
new SimpleObserver((value) => { | |
flow.setValueInternal(transform(value)); | |
}) | |
); | |
return flow; | |
} | |
distinctUntilChanged(): Flow<T> { | |
let flow = Flow.immutable(this, (value) => value); | |
this.addInternalObserver(flow, new SimpleObserver((value) => { | |
if (value !== flow.valueInternal && value !== undefined) { | |
flow.setValueInternal(value); | |
} | |
})); | |
return flow; | |
} | |
throttle(timeout: number): Flow<T> { | |
if (timeout < 0) { | |
return this; | |
} | |
let flow = Flow.immutable(this, (value) => value); | |
this.addInternalObserver(flow, new ThrottleObserver(new SimpleObserver((value) => { | |
flow.setValueInternal(value); | |
}), timeout)); | |
return flow; | |
} | |
/** | |
* Observe this flow. | |
* The observer will be called immediately with the current value of the flow. | |
* @param lifecycleOwner The lifecycle owner that the observer will be attached to. The observer will be removed | |
* when the lifecycle owner is stopped. If the lifecycle owner is already not active, the observer will not be added. | |
* @param observer | |
*/ | |
observe(lifecycleOwner: LifecycleOwner, observer: (value: T) => void) { | |
if (!lifecycleOwner.isActive) { | |
return; | |
} | |
let simpleObserver = new SimpleObserver(observer); | |
this.observers.push(simpleObserver); | |
lifecycleOwner.addObserver(new OnStopLifecycleObserver(() => { | |
const index = this.observers.indexOf(simpleObserver); | |
if (index !== -1) { | |
this.observers.splice(index, 1); | |
} | |
if (!this.hasSubscribers()) { | |
// When there are no more subscribers, we can clear the value since the parent flow will not propagate | |
// its state to this flow anymore. | |
this.valueInternal = undefined; | |
} | |
})); | |
this.delegateValueToObserver(simpleObserver, this.valueInternal); | |
} | |
private addInternalObserver(key: Flow<unknown>, observer: Observer<T>) { | |
this.internalObservers.set(key, observer); | |
} | |
private delegateValueToObserver(observer: Observer<T>, value: T) { | |
if (value !== undefined) { | |
observer.onChange(value); | |
} | |
} | |
/** | |
* Stops receiving updates from the parent flow. | |
* This method is useful when you want to stop receiving updates from the parent flow once this flow is no longer | |
* used. | |
*/ | |
stopReceivingUpdates() { | |
let parent = this.parent; | |
if (parent !== undefined) { | |
parent.internalObservers.delete(this); | |
} | |
} | |
private hasSubscribers(): boolean { | |
if (this.isValueUpdatedReactivelyRequired || this.observers.length > 0) { | |
return true; | |
} | |
for (let child of this.internalObservers.keys()) { | |
if (child.hasSubscribers()) { | |
return true; | |
} | |
} | |
return false; | |
} | |
} | |
class OnStopLifecycleObserver implements LifecycleObserver { | |
constructor(private callback: () => void) { | |
} | |
onStop() { | |
this.callback(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Flow} from "./flow"; | |
import {LifecycleOwner} from "./lifecycleowner"; | |
let lifecycleOwner = new LifecycleOwner(); | |
lifecycleOwner.onStart(); | |
let flow = new Flow<number>(); | |
let flow2 = flow.map(value => { | |
console.log("flow 2 map", value); | |
return "Hello " + value; | |
}); | |
let flow3 = flow.map(value => { | |
console.log("flow 3 map", value); | |
return Math.trunc(value / 2); | |
}).distinctUntilChanged(); | |
let flow4 = flow.throttle(2500); | |
setTimeout(() => { | |
console.log("flow2.observe"); | |
flow2.observe(lifecycleOwner, value => console.log("flow2", value)); | |
flow3.observe(lifecycleOwner, value => console.log("==", value)); | |
flow4.observe(lifecycleOwner, value => console.log("throttle", value)); | |
}, 2000); | |
let count = 0; | |
setInterval(() => { | |
flow.value = count++; | |
}, 500); | |
setTimeout(() => { | |
flow2.value = "100"; | |
}, 5000); | |
setTimeout(() => { | |
console.log("stopReceivingUpdates"); | |
flow3.stopReceivingUpdates(); | |
}, 7000); | |
setTimeout(() => { | |
console.log("onStop"); | |
lifecycleOwner.onStop(); | |
}, 10000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface LifecycleObserver { | |
onStart?(): void; | |
onStop?(): void; | |
} | |
export enum LifecycleState { | |
INITIALIZED, | |
STARTED, | |
STOPPED, | |
} | |
export class LifecycleOwner { | |
private observers: LifecycleObserver[] = []; | |
private state: LifecycleState = LifecycleState.INITIALIZED; | |
get isActive(): boolean { | |
return this.state == LifecycleState.STARTED; | |
} | |
addObserver(observer: LifecycleObserver) { | |
if (this.state == LifecycleState.STOPPED) { | |
return; | |
} | |
this.observers.push(observer); | |
if (this.state == LifecycleState.STARTED) { | |
observer.onStart?.(); | |
} | |
} | |
/** | |
* Called when the LifecycleOwner is ready to start. | |
* Do not override this method, override onStartInternal instead. | |
*/ | |
onStart() { | |
if (this.state == LifecycleState.STARTED) { | |
return; | |
} | |
this.state = LifecycleState.STARTED; | |
for (const observer of this.observers) { | |
observer.onStart?.(); | |
} | |
this.onStartInternal(); | |
} | |
protected onStartInternal() { | |
} | |
/** | |
* Called when the LifecycleOwner is ready to stop. | |
* Do not override this method, override onStopInternal instead. | |
*/ | |
onStop() { | |
if (this.state == LifecycleState.STOPPED) { | |
return; | |
} | |
this.state = LifecycleState.STOPPED; | |
this.onStopInternal(); | |
for (const observer of this.observers) { | |
observer.onStop?.(); | |
} | |
} | |
protected onStopInternal() { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment