Skip to content

Instantly share code, notes, and snippets.

@jodinathan
Created September 23, 2020 13:36
Show Gist options
  • Save jodinathan/fe44d27f9e7afb29e822617e336ce79f to your computer and use it in GitHub Desktop.
Save jodinathan/fe44d27f9e7afb29e822617e336ce79f to your computer and use it in GitHub Desktop.
Stream<T> transformInto<T, Y>(Stream<Y> stream, T Function(Y item) fn,
{bool? cancelOnError}) {
StreamSubscription<T> _change(Stream<Y> input, bool cancelOnError) {
late StreamSubscription<Y> subscription;
// Create controller that forwards pause, resume and cancel events.
final controller = StreamController<T>(
onPause: () {
subscription.pause();
},
onResume: () {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true); // "sync" is correct here, since events are forwarded.
// Listen to the provided stream using `cancelOnError`.
subscription = input.listen((data) {
controller.add(fn(data));
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
return controller.stream.listen(null);
}
// Return a new [StreamSubscription] by listening to the controller's
// stream.
return stream.transform(
StreamTransformer<Y, T>(_change));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment