Skip to content

Instantly share code, notes, and snippets.

@tuanchauict
Last active December 17, 2023 22:28
Show Gist options
  • Save tuanchauict/2c6f95c89152f98d8b7436ac81b65bb7 to your computer and use it in GitHub Desktop.
Save tuanchauict/2c6f95c89152f98d8b7436ac81b65bb7 to your computer and use it in GitHub Desktop.
Mimic Flow or LiveData of Android
import {LifecycleObserver, LifecycleOwner} from "./lifecycleowner";
interface Observer<T> {
onChange(value: T): void;
}
class SimpleObserver<T> implements Observer<T> {
constructor(private callback: (value: T) => void) {
}
onChange(value: T): void {
if (value !== undefined) {
this.callback(value);
}
}
}
class ThrottleObserver<T> implements Observer<T> {
private timeoutId: number | undefined;
private currentValue: T | undefined;
constructor(private observer: Observer<T>, private timeout: number) {
if (timeout < 0) {
throw new Error("Timeout must be >= 0");
}
}
onChange(value: T) {
this.currentValue = value;
if (this.timeoutId !== undefined) {
return;
}
if (this.timeout == 0) {
// @ts-ignore
this.timeoutId = requestAnimationFrame(this.timeoutTick.bind(this));
} else {
// @ts-ignore
this.timeoutId = setTimeout(this.timeoutTick.bind(this), this.timeout);
}
}
private timeoutTick() {
let newValue = this.currentValue;
if (newValue === undefined) {
return;
}
this.observer.onChange(newValue);
this.timeoutId = undefined;
}
}
export class Flow<T> {
private valueInternal: T;
private observers: Observer<T>[] = [];
private internalObservers: Map<Flow<unknown>, Observer<T>> = new Map();
private isImmutable = false;
private parent: Flow<unknown> | undefined;
private transform: (a: unknown) => T;
/**
* A flag that indicates whether the value of this flow should be updated when the parent flow's value changes
* regardless of whether this flow has subscribers.
* Turning this flag on will cause the value of this flow to be updated even if there are no subscribers.
* Only use this flag when reading the current value of the flow is required rather than observing the flow.
*/
private isValueUpdatedReactivelyRequired = false;
private static immutable<T0, T>(parent: Flow<T0>, transform: (value: T0) => T): Flow<T> {
let flow = new Flow<T>();
flow.parent = parent;
flow.transform = transform;
flow.isImmutable = true;
return flow;
}
constructor(value: T = undefined) {
if (value !== undefined) {
this.valueInternal = value;
}
}
set value(value: T) {
if (this.isImmutable) {
throw new Error("Flow is immutable");
}
this.setValueInternal(value);
}
get value(): T | undefined {
return this.valueInternal === undefined ? this.transform(this.parent.value) : this.valueInternal;
}
private setValueInternal(value: T) {
this.valueInternal = value;
for (const observer of this.observers) {
this.delegateValueToObserver(observer, value);
}
for (let [flow, observer] of this.internalObservers) {
if (flow.hasSubscribers()) {
this.delegateValueToObserver(observer, value);
}
}
}
/**
* Makes the value of this flow updated when the parent flow's value changes regardless of whether this flow has
* subscribers.
* This method is useful when reading the current value of the flow is required rather than observing the flow.
*/
makeValueUpdateReactively() {
this.isValueUpdatedReactivelyRequired = true;
// Update the value when this flag is turned on.
// This does nothing if the value is already up-to-date, so it's safe to call this method multiple times.
// Otherwise, this method will update the value to the latest value of the parent flow.
this.valueInternal = this.value;
}
map<R>(transform: (value: T) => R): Flow<R> {
let flow = Flow.immutable(this, transform);
this.addInternalObserver(flow,
new SimpleObserver((value) => {
flow.setValueInternal(transform(value));
})
);
return flow;
}
distinctUntilChanged(): Flow<T> {
let flow = Flow.immutable(this, (value) => value);
this.addInternalObserver(flow, new SimpleObserver((value) => {
if (value !== flow.valueInternal && value !== undefined) {
flow.setValueInternal(value);
}
}));
return flow;
}
throttle(timeout: number): Flow<T> {
if (timeout < 0) {
return this;
}
let flow = Flow.immutable(this, (value) => value);
this.addInternalObserver(flow, new ThrottleObserver(new SimpleObserver((value) => {
flow.setValueInternal(value);
}), timeout));
return flow;
}
/**
* Observe this flow.
* The observer will be called immediately with the current value of the flow.
* @param lifecycleOwner The lifecycle owner that the observer will be attached to. The observer will be removed
* when the lifecycle owner is stopped. If the lifecycle owner is already not active, the observer will not be added.
* @param observer
*/
observe(lifecycleOwner: LifecycleOwner, observer: (value: T) => void) {
if (!lifecycleOwner.isActive) {
return;
}
let simpleObserver = new SimpleObserver(observer);
this.observers.push(simpleObserver);
lifecycleOwner.addObserver(new OnStopLifecycleObserver(() => {
const index = this.observers.indexOf(simpleObserver);
if (index !== -1) {
this.observers.splice(index, 1);
}
if (!this.hasSubscribers()) {
// When there are no more subscribers, we can clear the value since the parent flow will not propagate
// its state to this flow anymore.
this.valueInternal = undefined;
}
}));
this.delegateValueToObserver(simpleObserver, this.valueInternal);
}
private addInternalObserver(key: Flow<unknown>, observer: Observer<T>) {
this.internalObservers.set(key, observer);
}
private delegateValueToObserver(observer: Observer<T>, value: T) {
if (value !== undefined) {
observer.onChange(value);
}
}
/**
* Stops receiving updates from the parent flow.
* This method is useful when you want to stop receiving updates from the parent flow once this flow is no longer
* used.
*/
stopReceivingUpdates() {
let parent = this.parent;
if (parent !== undefined) {
parent.internalObservers.delete(this);
}
}
private hasSubscribers(): boolean {
if (this.isValueUpdatedReactivelyRequired || this.observers.length > 0) {
return true;
}
for (let child of this.internalObservers.keys()) {
if (child.hasSubscribers()) {
return true;
}
}
return false;
}
}
class OnStopLifecycleObserver implements LifecycleObserver {
constructor(private callback: () => void) {
}
onStop() {
this.callback();
}
}
import {Flow} from "./flow";
import {LifecycleOwner} from "./lifecycleowner";
let lifecycleOwner = new LifecycleOwner();
lifecycleOwner.onStart();
let flow = new Flow<number>();
let flow2 = flow.map(value => {
console.log("flow 2 map", value);
return "Hello " + value;
});
let flow3 = flow.map(value => {
console.log("flow 3 map", value);
return Math.trunc(value / 2);
}).distinctUntilChanged();
let flow4 = flow.throttle(2500);
setTimeout(() => {
console.log("flow2.observe");
flow2.observe(lifecycleOwner, value => console.log("flow2", value));
flow3.observe(lifecycleOwner, value => console.log("==", value));
flow4.observe(lifecycleOwner, value => console.log("throttle", value));
}, 2000);
let count = 0;
setInterval(() => {
flow.value = count++;
}, 500);
setTimeout(() => {
flow2.value = "100";
}, 5000);
setTimeout(() => {
console.log("stopReceivingUpdates");
flow3.stopReceivingUpdates();
}, 7000);
setTimeout(() => {
console.log("onStop");
lifecycleOwner.onStop();
}, 10000);
export interface LifecycleObserver {
onStart?(): void;
onStop?(): void;
}
export enum LifecycleState {
INITIALIZED,
STARTED,
STOPPED,
}
export class LifecycleOwner {
private observers: LifecycleObserver[] = [];
private state: LifecycleState = LifecycleState.INITIALIZED;
get isActive(): boolean {
return this.state == LifecycleState.STARTED;
}
addObserver(observer: LifecycleObserver) {
if (this.state == LifecycleState.STOPPED) {
return;
}
this.observers.push(observer);
if (this.state == LifecycleState.STARTED) {
observer.onStart?.();
}
}
/**
* Called when the LifecycleOwner is ready to start.
* Do not override this method, override onStartInternal instead.
*/
onStart() {
if (this.state == LifecycleState.STARTED) {
return;
}
this.state = LifecycleState.STARTED;
for (const observer of this.observers) {
observer.onStart?.();
}
this.onStartInternal();
}
protected onStartInternal() {
}
/**
* Called when the LifecycleOwner is ready to stop.
* Do not override this method, override onStopInternal instead.
*/
onStop() {
if (this.state == LifecycleState.STOPPED) {
return;
}
this.state = LifecycleState.STOPPED;
this.onStopInternal();
for (const observer of this.observers) {
observer.onStop?.();
}
}
protected onStopInternal() {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment