Skip to content

Instantly share code, notes, and snippets.

@Davenchy
Last active November 8, 2021 22:23
Show Gist options
  • Save Davenchy/b31c11d8eee06f85ce9eb88638a0cb2e to your computer and use it in GitHub Desktop.
Save Davenchy/b31c11d8eee06f85ce9eb88638a0cb2e to your computer and use it in GitHub Desktop.
Dart EventEmitter
import 'dart:async';
typedef EventHandler<E> = void Function(E event);
typedef HandlerCanceler = Future<void> Function();
class EventEmitter<T> {
final StreamController<T> _controller = StreamController.broadcast();
final List<StreamSubscription> _subscriptions = [];
final List<Completer> _completers = [];
Stream<T> get stream => _controller.stream;
void emit<E extends T>(E event) => _controller.add(event);
Stream<E> filteredStream<E extends T>() =>
stream.where((event) => event is E).cast<E>();
HandlerCanceler handle<E extends T>(EventHandler<E> handler) {
final _sub = filteredStream<E>().listen(handler);
_subscriptions.add(_sub);
bool isCanceled = false;
return () async {
if (isCanceled) return;
_subscriptions.remove(_sub);
isCanceled = true;
await _sub.cancel();
};
}
Future<void> onNextEmit<E extends T>() {
final completer = Completer();
handle<E>((event) => completer.complete());
_completers.add(completer);
return completer.future.then((_) {
if (_ == null) _completers.remove(completer);
});
}
void completeCompleters() {
if (_completers.isEmpty) return;
for (var completer in _completers) {
completer.complete(false);
}
_completers.clear();
}
Future<void> clearHandlers() async {
if (_subscriptions.isEmpty) return;
for (var sub in _subscriptions) {
await sub.cancel();
}
_subscriptions.clear();
}
Future<void> destroy() async {
await _controller.close();
await clearHandlers();
completeCompleters();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment