Skip to content

Instantly share code, notes, and snippets.

@long1eu
Last active May 30, 2018 13:42
Show Gist options
  • Save long1eu/9c9e257f111e0c8d3ba44cbf840a8767 to your computer and use it in GitHub Desktop.
Save long1eu/9c9e257f111e0c8d3ba44cbf840a8767 to your computer and use it in GitHub Desktop.
Dart retryWhen transformer
// File created by
// long1eu <home@long1.eu>
// on 30/05/2018
import 'dart:async';
typedef Stream<T> RetryWhenStreamFactory<T>(Stream<Object> error);
class RetryWhenStreamTransformer<T> extends StreamTransformerBase<T, T> {
final StreamTransformer<T, T> transformer;
RetryWhenStreamTransformer(RetryWhenStreamFactory<T> streamFactory)
: transformer = _buildTransformer(streamFactory);
@override
Stream<T> bind(Stream<T> stream) => transformer.bind(stream);
static StreamTransformer<T, T> _buildTransformer<T>(
RetryWhenStreamFactory<T> streamFactory) {
if (streamFactory == null) {
throw new ArgumentError('streamFactory cannot be null');
}
return new StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
final List<T> events = <T>[];
StreamController<T> controller;
StreamSubscription<T> subscription;
bool canEmit = true;
controller = new StreamController<T>(
sync: true,
onListen: () {
subscription = input.listen(
(T event) {
if (canEmit) {
events.add(event);
controller.add(event);
}
},
onError: (dynamic e) {
canEmit = false;
streamFactory(new Stream<dynamic>.fromIterable(<dynamic>[e]))
.listen(
(T event) {
final List<T> items = events.toList(growable: false);
events.clear();
items.forEach(controller.add);
canEmit = true;
},
onError: (dynamic e, StackTrace s) {
controller.addError(e, s);
subscription.cancel();
controller.close();
},
cancelOnError: true,
);
},
onDone: () {
subscription.cancel();
controller.close();
},
);
},
onPause: () => subscription.pause(),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel(),
);
return controller.stream.listen(null);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment