Skip to content

Instantly share code, notes, and snippets.

@hoc081098
Created December 17, 2022 11:24
Show Gist options
  • Save hoc081098/83df08a0cdb36827ddcba1b5ead15ad9 to your computer and use it in GitHub Desktop.
Save hoc081098/83df08a0cdb36827ddcba1b5ead15ad9 to your computer and use it in GitHub Desktop.
bufferUntil_example.dart
import 'package:rxdart/rxdart.dart';
import 'dart:async';
Stream<int> dataStream() =>
Stream.periodic(const Duration(milliseconds: 500), (i) => i);
void main() async {
final trigger = PublishSubject<void>();
final sub = dataStream().bufferUntil(trigger).listen(print);
await Future<void>.delayed(const Duration(seconds: 3));
trigger.add(null);
await Future<void>.delayed(const Duration(seconds: 2));
sub.cancel().ignore();
}
extension BufferUntilStreamExtension<T> on Stream<T> {
Stream<T> bufferUntil(Stream<void> trigger) {
final controller = StreamController<T>(sync: true);
final bag = CompositeSubscription();
// send source events to buffer controller.
final bufferController = StreamController<T>(sync: true);
listen(
bufferController.add,
onError: bufferController.addError,
onDone: bufferController.close,
).addTo(bag);
controller.onListen = () {
// when trigger fires, send buffer controller events to output controller.
final subscription =
trigger.take(1).exhaustMap((_) => bufferController.stream).listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
)..addTo(bag);
if (!isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
};
controller.onCancel = bag.cancel;
return controller.stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment