Skip to content

Instantly share code, notes, and snippets.

@long1eu
Created June 23, 2018 02:10
Show Gist options
  • Save long1eu/db508dcf2c1a28ec226a939ca5fe28e0 to your computer and use it in GitHub Desktop.
Save long1eu/db508dcf2c1a28ec226a939ca5fe28e0 to your computer and use it in GitHub Desktop.
import 'dart:async';
import 'package:rxdart/src/streams/utils.dart';
typedef Stream<T> RetryWhenStreamFactory<T>(dynamic error, StackTrace s);
class RetryWhenStream<T> extends Stream<T> {
final StreamFactory<T> streamFactory;
final RetryWhenStreamFactory<T> retryWhenFactory;
StreamController<T> controller;
StreamSubscription<T> subscription;
bool _isUsed = false;
final List<ErrorAndStacktrace> _errors = <ErrorAndStacktrace>[];
RetryWhenStream(this.streamFactory, this.retryWhenFactory);
@override
StreamSubscription<T> listen(
void onData(T event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
if (_isUsed) throw new StateError('Stream has already been listened to.');
_isUsed = true;
controller = new StreamController<T>(
sync: true,
onListen: retry,
onPause: ([Future<dynamic> resumeSignal]) => subscription.pause(resumeSignal),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
return controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
void retry() {
subscription = streamFactory().listen(controller.add, onError: (dynamic e, StackTrace s) {
subscription.cancel();
_errors.add(new ErrorAndStacktrace(e, s));
retryWhenFactory(e, s).listen(
(dynamic event) => retry(),
onError: (dynamic e, StackTrace s) {
controller.addError(new RetryError(e.toString(), _errors..add(new ErrorAndStacktrace(e, s))));
controller.close();
},
);
}, onDone: controller.close, cancelOnError: false);
}
}
class RetryError extends Error {
final String message;
final List<ErrorAndStacktrace> errors;
RetryError(this.message, this.errors);
@override
String toString() => message;
}
class ErrorAndStacktrace {
final dynamic error;
final StackTrace stacktrace;
ErrorAndStacktrace(this.error, this.stacktrace);
@override
String toString() {
return 'ErrorAndStacktrace{error: $error, stacktrace: $stacktrace}';
}
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is ErrorAndStacktrace && runtimeType == other.runtimeType && error == other.error && stacktrace == other.stacktrace;
@override
int get hashCode => error.hashCode ^ stacktrace.hashCode;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment