Created
January 4, 2022 18:43
-
-
Save cranst0n/a8d3a83d5be0f1d7dd1aaa9a399cfd8c to your computer and use it in GitHub Desktop.
dartz Task retry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
part of dartz; | |
/// Mostly complete port of cats-retry to dartz. | |
class RetryStatus { | |
/// Retries attempted thus far. | |
final int retriesSoFar; | |
/// Total delay between successive retries. | |
final Duration cumulativeDelay; | |
/// Delay taken before attempting the previous retry. | |
final Option<Duration> previousDelay; | |
const RetryStatus( | |
this.retriesSoFar, | |
this.cumulativeDelay, | |
this.previousDelay, | |
); | |
/// Create initial status used when running first task attempt. | |
factory RetryStatus.initial() => RetryStatus(0, Duration.zero, none()); | |
/// Create new status that indicates an additional retry was taken after | |
/// the given delay. | |
RetryStatus retryAfter(Duration delay) => | |
RetryStatus(retriesSoFar + 1, cumulativeDelay + delay, some(delay)); | |
} | |
abstract class RetryDecision { | |
const RetryDecision._(); | |
/// Indicates if this decision is to give up retrying. | |
bool get isGivingUp; | |
/// Delay to take before the next retry. | |
Duration get delay; | |
/// Create a decision to give up and stop retrying. | |
factory RetryDecision.giveUp() => _GiveUp(); | |
/// Create a decision to retry the task after the provided delay. | |
factory RetryDecision.delayAndRetry(Duration delay) => | |
_DelayAndRetry._(delay); | |
/// Builds [RetryDetails] from the given status. | |
RetryDetails _detailsFromStatus(RetryStatus status) { | |
return RetryDetails( | |
status.retriesSoFar + (isGivingUp ? 0 : 1), | |
status.cumulativeDelay + delay, | |
isGivingUp, | |
some(delay).filter((_) => !isGivingUp), | |
); | |
} | |
RetryStatus _updateStatus(RetryStatus status) => | |
isGivingUp ? status : status.retryAfter(delay); | |
} | |
class _GiveUp extends RetryDecision { | |
static final _GiveUp _singleton = _GiveUp._(); | |
const _GiveUp._() : super._(); | |
factory _GiveUp() => _singleton; | |
@override | |
bool get isGivingUp => true; | |
@override | |
Duration get delay => Duration.zero; | |
@override | |
String toString() => 'RetryDecision.GiveUp'; | |
@override | |
bool operator ==(Object other) => identical(this, other) || other is _GiveUp; | |
@override | |
int get hashCode => _singleton.hashCode; | |
} | |
class _DelayAndRetry extends RetryDecision { | |
@override | |
final Duration delay; | |
const _DelayAndRetry._(this.delay) : super._(); | |
@override | |
bool get isGivingUp => false; | |
@override | |
String toString() => 'RetryDecision.DelayAndRetry($delay)'; | |
@override | |
bool operator ==(Object other) => | |
identical(this, other) || | |
(other is _DelayAndRetry && other.delay == delay); | |
@override | |
int get hashCode => delay.hashCode; | |
} | |
class RetryPolicy { | |
final Function1<RetryStatus, RetryDecision> decideOn; | |
RetryPolicy(this.decideOn); | |
// construction | |
factory RetryPolicy.alwaysGiveUp() => | |
RetryPolicy((_) => RetryDecision.giveUp()); | |
factory RetryPolicy.limitRetries(int maxRetries) => RetryPolicy((status) { | |
if (status.retriesSoFar >= maxRetries) { | |
return RetryDecision.giveUp(); | |
} else { | |
return RetryDecision.delayAndRetry(Duration.zero); | |
} | |
}); | |
factory RetryPolicy.constantDelay(Duration delay) => | |
RetryPolicy((status) => RetryDecision.delayAndRetry(delay)); | |
factory RetryPolicy.exponentialBackoff(Duration baseDelay) => | |
RetryPolicy((status) => RetryDecision.delayAndRetry( | |
_safeMultiply(baseDelay, BigInt.from(2).pow(status.retriesSoFar)))); | |
/// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ | |
factory RetryPolicy.fullJitter(Duration baseDelay) => RetryPolicy((status) { | |
final maxDelay = | |
_safeMultiply(baseDelay, BigInt.from(2).pow(status.retriesSoFar)); | |
return RetryDecision.delayAndRetry( | |
maxDelay * new Random().nextDouble()); | |
}); | |
static Duration _safeMultiply(Duration duration, BigInt factor) { | |
final micros = BigInt.from(duration.inMicroseconds) * factor; | |
return Duration( | |
microseconds: micros.isValidInt ? micros.toInt() : 9223372036854775807, | |
); | |
} | |
// combinators | |
RetryPolicy capDelay(Duration maxDelay) => | |
mapDelay((d) => d > maxDelay ? maxDelay : d); | |
RetryPolicy giveUpAfterDelay(Duration cumulativeDelay) => | |
RetryPolicy((status) => | |
status.previousDelay.getOrElse(() => Duration.zero) >= cumulativeDelay | |
? RetryDecision.giveUp() | |
: decideOn(status)); | |
RetryPolicy giveUpAfterCumulativeDelay(Duration cumulativeDelay) => | |
RetryPolicy((status) => status.cumulativeDelay >= cumulativeDelay | |
? RetryDecision.giveUp() | |
: decideOn(status)); | |
RetryPolicy followedBy(RetryPolicy policy) => RetryPolicy((status) { | |
final thisDecision = decideOn(status); | |
return thisDecision.isGivingUp ? thisDecision : policy.decideOn(status); | |
}); | |
RetryPolicy join(RetryPolicy policy) => RetryPolicy((status) { | |
final thisDecision = decideOn(status); | |
final thatDecision = policy.decideOn(status); | |
if (thisDecision.isGivingUp || thatDecision.isGivingUp) { | |
return RetryDecision.giveUp(); | |
} else { | |
return RetryDecision.delayAndRetry( | |
Duration( | |
microseconds: max( | |
thisDecision.delay.inMicroseconds, | |
thatDecision.delay.inMicroseconds, | |
), | |
), | |
); | |
} | |
}); | |
RetryPolicy meet(RetryPolicy policy) => RetryPolicy((status) { | |
final thisDecision = decideOn(status); | |
final thatDecision = policy.decideOn(status); | |
if (!thisDecision.isGivingUp && !thatDecision.isGivingUp) { | |
return RetryDecision.delayAndRetry( | |
Duration( | |
microseconds: min( | |
thisDecision.delay.inMicroseconds, | |
thatDecision.delay.inMicroseconds, | |
), | |
), | |
); | |
} else if (thisDecision.isGivingUp) { | |
return thatDecision; | |
} else if (thatDecision.isGivingUp) { | |
return thisDecision; | |
} else { | |
return RetryDecision.giveUp(); | |
} | |
}); | |
RetryPolicy mapDelay(Function1<Duration, Duration> f) => RetryPolicy( | |
(status) { | |
final decision = decideOn(status); | |
if (decision.isGivingUp) { | |
return RetryDecision.giveUp(); | |
} else { | |
return RetryDecision.delayAndRetry(f(decision.delay)); | |
} | |
}, | |
); | |
} | |
class RetryDetails { | |
final int retriesSoFar; | |
final Duration cumulativeDelay; | |
final bool givingUp; | |
final Option<Duration> upcomingDelay; | |
RetryDetails( | |
this.retriesSoFar, | |
this.cumulativeDelay, | |
this.givingUp, | |
this.upcomingDelay, | |
); | |
@override | |
String toString() => 'RetryDetails(' | |
'retriesSoFar = $retriesSoFar, ' | |
'cumulativeDelay = $cumulativeDelay, ' | |
'givingUp = $givingUp, ' | |
'upcomingDelay = $upcomingDelay)'; | |
} | |
extension RetryOps<A> on Task<A> { | |
Task<A> retrying( | |
RetryPolicy policy, { | |
Function1<A, bool>? wasSuccessful, | |
Function1<Object, bool>? isWorthRetrying, | |
Function2<Object, RetryDetails, Task<void>>? onError, | |
Function2<A, RetryDetails, Task<void>>? onFailure, | |
}) => | |
_retryingImpl( | |
policy, | |
wasSuccessful ?? (_) => true, | |
isWorthRetrying ?? (_) => true, | |
onError ?? (_, __) => Task.unit, | |
onFailure ?? (_, __) => Task.unit, | |
RetryStatus.initial(), | |
this, | |
); | |
Task<A> _retryingImpl( | |
RetryPolicy policy, | |
Function1<A, bool> wasSuccessful, | |
Function1<Object, bool> isWorthRetrying, | |
Function2<Object, RetryDetails, Task<void>> onError, | |
Function2<A, RetryDetails, Task<void>> onFailure, | |
RetryStatus status, | |
Task<A> action, | |
) { | |
return action.attempt().flatMap((result) { | |
return result.fold( | |
(err) => isWorthRetrying(err) | |
? _onFailureOrError(policy, wasSuccessful, isWorthRetrying, onError, | |
onFailure, status, action, (details) => onError(err, details)) | |
: Task.failed(err.toString()), | |
(a) => wasSuccessful(a) | |
? Task.value(a) | |
: _onFailureOrError(policy, wasSuccessful, isWorthRetrying, onError, | |
onFailure, status, action, (details) => onFailure(a, details)), | |
); | |
}); | |
} | |
Task<A> _onFailureOrError( | |
RetryPolicy policy, | |
Function1<A, bool> wasSuccessful, | |
Function1<Object, bool> isWorthRetrying, | |
Function2<Object, RetryDetails, Task<void>> onError, | |
Function2<A, RetryDetails, Task<void>> onFailure, | |
RetryStatus status, | |
Task<A> action, | |
Function1<RetryDetails, Task<void>> beforeRecurse, | |
) { | |
final decision = policy.decideOn(status); | |
final newStatus = decision._updateStatus(status); | |
final details = decision._detailsFromStatus(newStatus); | |
return Task.value(decision.isGivingUp).ifM( | |
Task.failed('Retry giving up.'), | |
beforeRecurse(details).andThen(Task.sleep(decision.delay)).andThen( | |
_retryingImpl(policy, wasSuccessful, isWorthRetrying, onError, | |
onFailure, newStatus, action)), | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Timeout(Duration(minutes: 2)) | |
import 'package:test/test.dart'; | |
import 'package:dartz/dartz.dart'; | |
void main() { | |
test('RetryPolicy.limitRetries', () async { | |
final policy = RetryPolicy.limitRetries(10); | |
final status0 = RetryStatus(0, Duration.zero, none()); | |
final status10 = RetryStatus(10, Duration.zero, none()); | |
expect( | |
policy.decideOn(status0), RetryDecision.delayAndRetry(Duration.zero)); | |
expect(policy.decideOn(status10), RetryDecision.giveUp()); | |
}); | |
test('RetryPolicy.constantDelay', () async { | |
final delay = Duration(seconds: 42); | |
final policy = RetryPolicy.constantDelay(delay); | |
final status0 = RetryStatus(0, Duration.zero, none()); | |
final status9 = RetryStatus(9, Duration.zero, none()); | |
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(delay)); | |
expect(policy.decideOn(status9), RetryDecision.delayAndRetry(delay)); | |
}); | |
test('RetryPolicy.exponentialBackoff', () async { | |
final baseDelay = Duration(seconds: 1); | |
final policy = RetryPolicy.exponentialBackoff(baseDelay); | |
final status0 = RetryStatus(0, Duration.zero, none()); | |
final status3 = RetryStatus(3, Duration.zero, none()); | |
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(baseDelay)); | |
expect(policy.decideOn(status3), | |
RetryDecision.delayAndRetry(Duration(seconds: 8))); | |
}); | |
test('RetryPolicy.capDelay', () async { | |
final baseDelay = Duration(seconds: 1); | |
final maxDelay = Duration(minutes: 1); | |
final policy = RetryPolicy.exponentialBackoff(baseDelay).capDelay(maxDelay); | |
final status0 = RetryStatus(0, Duration.zero, none()); | |
final status3 = RetryStatus(3, Duration.zero, none()); | |
final status100 = RetryStatus(100, Duration.zero, none()); | |
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(baseDelay)); | |
expect(policy.decideOn(status3), | |
RetryDecision.delayAndRetry(Duration(seconds: 8))); | |
expect(policy.decideOn(status100), RetryDecision.delayAndRetry(maxDelay)); | |
}); | |
test('RetryPolicy.stack safety', () async { | |
int count = 0; | |
final logFailure = Task.delay(() { | |
if (count % 100 == 0) { | |
print('count: $count'); | |
} | |
}); | |
final t = Task.delay(() => count++).retrying( | |
RetryPolicy.limitRetries(10000) | |
.join(RetryPolicy.constantDelay(Duration(milliseconds: 10))), | |
wasSuccessful: (count) => count > 9999, | |
onFailure: (_, __) => logFailure, | |
); | |
await t.timeout(Duration(minutes: 2)).run(); | |
expect(count, 10000); | |
}); | |
test("Task.retrying playground", () async { | |
var count = 0; | |
var errorsLogged = 0; | |
var failuresLogged = 0; | |
final logError = (err, details) => | |
Task.print('[${DateTime.now()}] err: $err / $details') | |
.andThen(Task.delay(() => errorsLogged++)); | |
final logFailure = (a, details) => | |
Task.print('[${DateTime.now()}] fail: $a / $details') | |
.andThen(Task.delay(() => failuresLogged++)); | |
final retryingTask = Task.delay<int>( | |
() => count++, | |
// () => throw FormatException('kaboom'), | |
).retrying( | |
RetryPolicy.limitRetries(10) | |
.join(RetryPolicy.exponentialBackoff(Duration(milliseconds: 5))), | |
// .join(RetryPolicy.fullJitter(Duration(milliseconds: 500))), | |
// .capDelay(Duration(milliseconds: 1200)), | |
wasSuccessful: (a) => a >= 10, | |
// retryOn: (err) => err is! FormatException, | |
onError: logError, | |
onFailure: logFailure, | |
); | |
final result = | |
await retryingTask.timeout(const Duration(seconds: 10)).attempt().run(); | |
// final result = await retryingTask.attempt().run(); | |
result.fold( | |
(err) { | |
expect(errorsLogged, 10); | |
}, | |
(a) { | |
expect(a, 10); | |
expect(errorsLogged, 0); | |
}, | |
); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment