Skip to content

Instantly share code, notes, and snippets.

@hoc081098
Created March 30, 2024 11:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoc081098/29cbb70d8fa8023d09f860daf602205f to your computer and use it in GitHub Desktop.
Save hoc081098/29cbb70d8fa8023d09f860daf602205f to your computer and use it in GitHub Desktop.
import 'dart:async';
extension SplitExtension<T> on Stream<T> {
Stream<T> split() {
final controllers = <MultiStreamController<T>>[];
var index = -1;
var done = false;
StreamSubscription<T> subscription;
return Stream.multi(
(controller) {
if (done) {
return controller.closeSync();
}
final wasEmpty = controllers.isEmpty;
controllers.add(controller);
if (wasEmpty) {
subscription = listen(
(value) {
if (controllers.every((s) => s.isPaused)) {
return;
}
while (true) {
index = (index + 1) % controllers.length;
final controller = controllers[index];
if (!controller.isPaused) {
controller.addSync(value);
return;
}
}
},
onError: (e, StackTrace st) {
if (controllers.every((s) => s.isPaused)) {
return;
}
while (true) {
index = (index + 1) % controllers.length;
final controller = controllers[index];
if (!controller.isPaused) {
controller.addErrorSync(e, st);
return;
}
}
},
onDone: () {
done = true;
controllers.forEach((c) {
c.onCancel = null;
c.closeSync();
});
controllers.clear();
},
);
}
controller.onCancel = () {
controllers.remove(controller);
if (controllers.isEmpty) {
subscription?.cancel();
subscription = null;
done = true;
}
};
},
isBroadcast: true,
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment