Skip to content

Instantly share code, notes, and snippets.

@ovcharik
Created May 23, 2024 09:45
Show Gist options
  • Save ovcharik/11c151512c1c4dc201a2c5feec617fdb to your computer and use it in GitHub Desktop.
Save ovcharik/11c151512c1c4dc201a2c5feec617fdb to your computer and use it in GitHub Desktop.
ObservableStorage
import {
BehaviorSubject,
combineLatest,
defaultIfEmpty,
filter,
iif,
map,
Observable,
observeOn,
of,
queueScheduler,
ReplaySubject,
share,
shareReplay,
startWith,
Subject,
switchMap,
take,
} from 'rxjs';
/**
* ObservableStorage служит для формирования единого потока изменений из множества взаимно-зависимых
* источник.
*
* Основная проблема при работе с множеством потоков, это их синхронизация. В первую очередь, данная
* реализация предназначена для работы с потоками, в которых есть рекурсивные зависимости.
*
* При помощи DI, ангуляр позволяет, очень гибко настраивать различные связи между компонентами. Но
* внедрение зависимостей не позволяет устанавливать отношения между потоками данных в таких связях.
* Например, отдельные инструменты для отслеживания потока данных, есть в реактивных формах. И это
* достаточно объемный инструментарий, работающих на схожих принципах, что и это хранилище. Основная
* проблема в том, что реактивные формы имеют ограниченную специализацию. Данное же решение
* полностью полагается на RxJs, что дает большую гибкость в плане вариантов использования.
*
* В примере ниже показано, как с помощью данного хранилища, организуется система глубокого
* отслеживания изменений в древовидной структуре.
*
* @example
*
* ```ts
* type Node = { title: string; children: Node[] };
*
* class NodeBuilder {
* protected readonly childrenStorage$ = new ObservableStorage<Node>();
*
* protected readonly title$ = new BehaviorSubject<string>('');
* protected readonly children$ = this.childrenStorage$.pipe(startWith<Node[]>([]));
*
* public readonly node$ = combineLatest([this.title$, this.children$]).pipe(
* debounceTime(0),
* map(([title, children]) => ({ title, children }))
* );
*
* constructor(title: string, protected parent?: NodeBuilder) {
* this.setTitle(title);
* if (this.parent) {
* this.parent.attachChild(this.node$);
* }
* }
*
* protected attachChild(child: Observable<Node>) {
* this.childrenStorage$.attachSource(child, child);
* }
*
* setTitle(title: string) {
* this.title$.next(title);
* }
* }
*
* const root = new NodeBuilder('root');
* const foo = new NodeBuilder('Foo', root);
* const bar = new NodeBuilder('Bar', root);
* const baz = new NodeBuilder('Baz', bar);
*
* foo.setTitle('FOOOOO!!!');
* baz.setTitle('BarBaz');
*
* root.node$.subscribe(console.log);
* // { "title": "root", "children": [{ "title": "FOOOOO!!!", "children": [] }, { ... }] }
* ```
*/
export class ObservableStorage<Data, Key = unknown> extends Observable<Data[]> {
private readonly storage = new Map<Key, Observable<Data | void>>();
/** Контроль подключения и отключения источников данных */
private readonly invalidated$ = new Subject<void>();
/** Передает источник данных, который был отключен от хранилища */
private readonly detached$ = new Subject<Observable<Data | void>>();
/** Текущее состояние всех источников данных, упакованное в массив. */
private readonly inputs$ = this.invalidated$.pipe(
map(() => [...this.storage.values()]),
switchMap((sources) => combineLatest(sources).pipe(defaultIfEmpty([] as Data[]))),
map(<T>(results: T[]) => results.filter((result: T): result is Exclude<T, void> => !!result)),
shareReplay({ bufferSize: 1, refCount: true })
);
/** Промежуточная точка для передачи изменений подписчикам хранилища */
private readonly output$ = new BehaviorSubject<Data[]>([]);
/**
* Внутренняя подписка, для передачи изменений из `inputs$` в `output$`.
*
* Также данная подписка устанавливает единую зависимость для внешних подписчиков. Завершение
* данной подписки, приведет к завершению всех внешних подписок.
*/
private readonly inputsSubscription = this.inputs$.subscribe(this.output$);
constructor() {
// При помощи конструктора родительского класса Observable, устанавливаются правила подключения
// внешних слушателей к внутреннему потоку изменений хранилища
super((outputSubscriber) => {
const outputSubscription = this.output$.subscribe(outputSubscriber);
this.inputsSubscription.add(outputSubscription);
return () => outputSubscription.unsubscribe();
});
}
/**
* Завершение работы хранилища. Хранилище будет отключено от всех источников данных, добавленных в
* хранилище. Также будут отключены все внешние подписчики хранилища. При попытке подписаться к
* выключенному хранилищу, вернется последнее известное состояние.
*/
complete() {
this.storage.clear();
this.inputsSubscription.unsubscribe();
this.invalidated$.complete();
this.detached$.complete();
this.output$.complete();
}
/**
* Подключение источника данных к хранилищу.
*
* При подключении источника, происходит настройка синхронизации потока изменений источника, с
* потоками других источников, добавленных в хранилище. Ключевые моменты здесь - это создание
* подписки с многоадресной рассылкой, и контроль за моментом завершения отслеживания
* изменений.
*
* Внешние подписчики продолжат получать последнее значение источника, даже после завершения его
* потока изменений. Передача значений прекратится только после отключения источника от хранилища.
*
* @param key ключ для идентификации источника данных
* @param source источник данных
*/
attachSource(key: Key, source: Observable<Data>) {
const storedSource = this.storage.get(key);
if (storedSource) this.detached$.next(storedSource);
const waitDetach$: Observable<boolean> = this.detached$.pipe(
filter((detached) => detached === sharedSource),
map(Boolean),
take(1)
);
const detachNotifier$: Observable<boolean> = iif(
() => this.storage.get(key) === sharedSource,
waitDetach$,
of(true)
);
const sharedSource = source.pipe(
observeOn(queueScheduler),
startWith(void 0),
share({
connector: () => new ReplaySubject(1),
resetOnComplete: () => detachNotifier$,
resetOnRefCountZero: () => detachNotifier$,
})
);
this.storage.set(key, sharedSource);
this.invalidated$.next();
}
/**
* Отключение источника данных от хранилища.
*
* @param key ключ для идентификации источника данных
*/
detachSource(key: Key) {
const storedSource = this.storage.get(key);
if (!storedSource) return;
this.storage.delete(key);
this.detached$.next(storedSource);
this.invalidated$.next();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment