Skip to content

Instantly share code, notes, and snippets.

@shinayser
Created August 19, 2020 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shinayser/519dedda4626e0c80203bc0673880963 to your computer and use it in GitHub Desktop.
Save shinayser/519dedda4626e0c80203bc0673880963 to your computer and use it in GitHub Desktop.
This transformer splits the source stream data between all the listeners
import 'dart:async';
extension LoadSplitterTransformerExtension<T> on Stream<T> {
///Splits the data events between all the listeners.
///The error events are still being emitted to all of them.
///
/// ### Example
///
/// var splitted = Stream.fromIterable([1, 2, 3]).splitLoad();
///
/// splitted.listen((event) => print('GOT $event ON A'));
///
/// splitted.listen((event) => print('GOT $event ON B'));
///
/// splitted.listen((event) => print('GOT $event ON C'));
///
/// Prints:
///
/// GOT 1 ON A
///
/// GOT 2 ON B
///
/// GOT 3 ON C
///
Stream<T> splitLoad() => transform(_LoadSplitterTransformer());
}
class _LoadSplitterTransformer<T> extends StreamTransformerBase<T, T> {
_LoadSplitterStream<T> _loadBalancedStream;
Stream<T> _source;
bool cancelOnError;
_LoadSplitterTransformer({this.cancelOnError});
@override
Stream<T> bind(Stream<T> stream) {
_source = stream;
_loadBalancedStream = _LoadSplitterStream(_source);
return _loadBalancedStream;
}
}
class _LoadSplitterStream<T> extends Stream<T> {
final _controllers = <StreamController<T>>[];
final Stream<T> _source;
StreamSubscription<T> _sourceSubscription;
int nextControllerIndex = 0;
_LoadSplitterStream(this._source) {
_listenSource();
}
void _listenSource() {
_sourceSubscription?.cancel();
_sourceSubscription = _source.listen(
(event) {
bool emitted;
do {
if (_controllers.isEmpty ||
_controllers.every((element) => element.isPaused)) return;
var nextController = _controllers[nextControllerIndex];
if (nextController.isPaused) {
emitted = false;
} else {
nextController.add(event);
emitted = true;
}
nextControllerIndex = (nextControllerIndex + 1) % _controllers.length;
} while (!emitted);
},
onError: (err, stack) {
_controllers.forEach((it) => it.addError(err, stack));
},
onDone: () {
_controllers.forEach((it) => it.close());
},
);
}
@override
StreamSubscription<T> listen(
void Function(T event) onData, {
Function onError,
void Function() onDone,
bool cancelOnError,
}) {
StreamController<T> newController;
newController = StreamController<T>(
sync: false,
onListen: () {
if (_sourceSubscription == null) {
_listenSource();
}
},
onCancel: () {
_controllers.remove(newController);
if (_controllers.isEmpty) {
_sourceSubscription?.cancel();
_sourceSubscription = null;
}
});
_controllers.add(newController);
return newController.stream.listen(
onData,
onError: onError,
cancelOnError: cancelOnError,
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment