Skip to content

Instantly share code, notes, and snippets.

@christianalfoni
Created May 4, 2019 06:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save christianalfoni/f85aa1079195192901decb5ef19ee3a2 to your computer and use it in GitHub Desktop.
Save christianalfoni/f85aa1079195192901decb5ef19ee3a2 to your computer and use it in GitHub Desktop.
import 'dart:async';
import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart' as rx;
Observer currentObserver;
class Observer {
Map<rx.Observable, StreamSubscription> _subscriptions = Map();
rx.BehaviorSubject _subject = rx.BehaviorSubject();
Observer() {
_subject.onCancel = () {
_clear();
};
}
_clear() {
_subscriptions.forEach((observable, subscription) {
subscription.cancel();
});
_subscriptions.clear();
}
addListener(rx.Observable rxObservable) {
if (_subscriptions.containsKey(rxObservable)) {
return;
}
_subscriptions[rxObservable] = rxObservable.skip(1).listen((data) {
_subject.add(data);
});
}
getStreamBuilder(Widget Function() cb) {
return StreamBuilder(
stream: _subject.stream,
builder: (_, __) {
_clear();
final observer = currentObserver;
currentObserver = this;
final result = cb();
currentObserver = observer;
return result;
}
);
}
}
StreamBuilder observe(Widget Function() cb) {
return Observer().getStreamBuilder(cb);
}
class Observable<T> {
StreamSubscription<T> _stream;
rx.BehaviorSubject<T> _subject;
rx.Observable<T> get $stream => _subject.stream;
T get() {
if (currentObserver != null) {
currentObserver.addListener($stream);
}
return _subject.value;
}
void setStream(Stream<T> stream) {
if (_stream != null) {
_stream.cancel();
}
if (stream == null) {
return;
}
_stream = stream.listen((value) => _subject.add(value));
}
void set(T newValue) {
_subject.add(newValue);
}
void change(T Function(T) cb) {
_subject.add(cb(_subject.value));
}
Observable(T initialValue) {
this._subject = rx.BehaviorSubject.seeded(initialValue);
}
}
class Computed<T> extends Observer {
T Function() cb;
bool _isDirty = true;
dynamic _cachedResult;
Computed(this.cb) : super() {
_subject.stream.listen((_) {
_isDirty = true;
});
}
T get() {
if (_isDirty) {
_clear();
var previousObserver = currentObserver;
currentObserver = this;
_cachedResult = cb();
currentObserver = previousObserver;
_isDirty = false;
}
if (currentObserver != null && !_subscriptions.containsKey(_subject.stream)) {
final observer = currentObserver;
observer._subscriptions[_subject.stream] = _subject.stream.listen((data) {
observer._subject.add(data);
});
}
return _cachedResult;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment