Created
August 19, 2020 19:03
-
-
Save shinayser/519dedda4626e0c80203bc0673880963 to your computer and use it in GitHub Desktop.
This transformer splits the source stream data between all the listeners
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
extension LoadSplitterTransformerExtension<T> on Stream<T> { | |
///Splits the data events between all the listeners. | |
///The error events are still being emitted to all of them. | |
/// | |
/// ### Example | |
/// | |
/// var splitted = Stream.fromIterable([1, 2, 3]).splitLoad(); | |
/// | |
/// splitted.listen((event) => print('GOT $event ON A')); | |
/// | |
/// splitted.listen((event) => print('GOT $event ON B')); | |
/// | |
/// splitted.listen((event) => print('GOT $event ON C')); | |
/// | |
/// Prints: | |
/// | |
/// GOT 1 ON A | |
/// | |
/// GOT 2 ON B | |
/// | |
/// GOT 3 ON C | |
/// | |
Stream<T> splitLoad() => transform(_LoadSplitterTransformer()); | |
} | |
class _LoadSplitterTransformer<T> extends StreamTransformerBase<T, T> { | |
_LoadSplitterStream<T> _loadBalancedStream; | |
Stream<T> _source; | |
bool cancelOnError; | |
_LoadSplitterTransformer({this.cancelOnError}); | |
@override | |
Stream<T> bind(Stream<T> stream) { | |
_source = stream; | |
_loadBalancedStream = _LoadSplitterStream(_source); | |
return _loadBalancedStream; | |
} | |
} | |
class _LoadSplitterStream<T> extends Stream<T> { | |
final _controllers = <StreamController<T>>[]; | |
final Stream<T> _source; | |
StreamSubscription<T> _sourceSubscription; | |
int nextControllerIndex = 0; | |
_LoadSplitterStream(this._source) { | |
_listenSource(); | |
} | |
void _listenSource() { | |
_sourceSubscription?.cancel(); | |
_sourceSubscription = _source.listen( | |
(event) { | |
bool emitted; | |
do { | |
if (_controllers.isEmpty || | |
_controllers.every((element) => element.isPaused)) return; | |
var nextController = _controllers[nextControllerIndex]; | |
if (nextController.isPaused) { | |
emitted = false; | |
} else { | |
nextController.add(event); | |
emitted = true; | |
} | |
nextControllerIndex = (nextControllerIndex + 1) % _controllers.length; | |
} while (!emitted); | |
}, | |
onError: (err, stack) { | |
_controllers.forEach((it) => it.addError(err, stack)); | |
}, | |
onDone: () { | |
_controllers.forEach((it) => it.close()); | |
}, | |
); | |
} | |
@override | |
StreamSubscription<T> listen( | |
void Function(T event) onData, { | |
Function onError, | |
void Function() onDone, | |
bool cancelOnError, | |
}) { | |
StreamController<T> newController; | |
newController = StreamController<T>( | |
sync: false, | |
onListen: () { | |
if (_sourceSubscription == null) { | |
_listenSource(); | |
} | |
}, | |
onCancel: () { | |
_controllers.remove(newController); | |
if (_controllers.isEmpty) { | |
_sourceSubscription?.cancel(); | |
_sourceSubscription = null; | |
} | |
}); | |
_controllers.add(newController); | |
return newController.stream.listen( | |
onData, | |
onError: onError, | |
cancelOnError: cancelOnError, | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment