Skip to content

Instantly share code, notes, and snippets.

@hoc081098
Created December 17, 2022 11:18
Show Gist options
  • Save hoc081098/31aa9bd106ddbe854cb31b138e724a5c to your computer and use it in GitHub Desktop.
Save hoc081098/31aa9bd106ddbe854cb31b138e724a5c to your computer and use it in GitHub Desktop.
bufferUntil
extension BufferUntilStreamExtension<T> on Stream<T> {
Stream<T> bufferUntil(Stream<void> trigger) {
final controller = StreamController<T>(sync: true);
final bag = CompositeSubscription();
// send source events to buffer controller.
final bufferController = StreamController<T>(sync: true);
listen(
bufferController.add,
onError: bufferController.addError,
onDone: bufferController.close,
).addTo(bag);
controller.onListen = () {
// when trigger fires, send buffer controller events to output controller.
final subscription =
trigger.take(1).exhaustMap((_) => bufferController.stream).listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
)..addTo(bag);
if (!isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
};
controller.onCancel = bag.cancel;
return controller.stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment