Skip to content

Instantly share code, notes, and snippets.

@cranst0n
Created January 4, 2022 18:43
Show Gist options
  • Save cranst0n/a8d3a83d5be0f1d7dd1aaa9a399cfd8c to your computer and use it in GitHub Desktop.
Save cranst0n/a8d3a83d5be0f1d7dd1aaa9a399cfd8c to your computer and use it in GitHub Desktop.
dartz Task retry
part of dartz;
/// Mostly complete port of cats-retry to dartz.
class RetryStatus {
/// Retries attempted thus far.
final int retriesSoFar;
/// Total delay between successive retries.
final Duration cumulativeDelay;
/// Delay taken before attempting the previous retry.
final Option<Duration> previousDelay;
const RetryStatus(
this.retriesSoFar,
this.cumulativeDelay,
this.previousDelay,
);
/// Create initial status used when running first task attempt.
factory RetryStatus.initial() => RetryStatus(0, Duration.zero, none());
/// Create new status that indicates an additional retry was taken after
/// the given delay.
RetryStatus retryAfter(Duration delay) =>
RetryStatus(retriesSoFar + 1, cumulativeDelay + delay, some(delay));
}
abstract class RetryDecision {
const RetryDecision._();
/// Indicates if this decision is to give up retrying.
bool get isGivingUp;
/// Delay to take before the next retry.
Duration get delay;
/// Create a decision to give up and stop retrying.
factory RetryDecision.giveUp() => _GiveUp();
/// Create a decision to retry the task after the provided delay.
factory RetryDecision.delayAndRetry(Duration delay) =>
_DelayAndRetry._(delay);
/// Builds [RetryDetails] from the given status.
RetryDetails _detailsFromStatus(RetryStatus status) {
return RetryDetails(
status.retriesSoFar + (isGivingUp ? 0 : 1),
status.cumulativeDelay + delay,
isGivingUp,
some(delay).filter((_) => !isGivingUp),
);
}
RetryStatus _updateStatus(RetryStatus status) =>
isGivingUp ? status : status.retryAfter(delay);
}
class _GiveUp extends RetryDecision {
static final _GiveUp _singleton = _GiveUp._();
const _GiveUp._() : super._();
factory _GiveUp() => _singleton;
@override
bool get isGivingUp => true;
@override
Duration get delay => Duration.zero;
@override
String toString() => 'RetryDecision.GiveUp';
@override
bool operator ==(Object other) => identical(this, other) || other is _GiveUp;
@override
int get hashCode => _singleton.hashCode;
}
class _DelayAndRetry extends RetryDecision {
@override
final Duration delay;
const _DelayAndRetry._(this.delay) : super._();
@override
bool get isGivingUp => false;
@override
String toString() => 'RetryDecision.DelayAndRetry($delay)';
@override
bool operator ==(Object other) =>
identical(this, other) ||
(other is _DelayAndRetry && other.delay == delay);
@override
int get hashCode => delay.hashCode;
}
class RetryPolicy {
final Function1<RetryStatus, RetryDecision> decideOn;
RetryPolicy(this.decideOn);
// construction
factory RetryPolicy.alwaysGiveUp() =>
RetryPolicy((_) => RetryDecision.giveUp());
factory RetryPolicy.limitRetries(int maxRetries) => RetryPolicy((status) {
if (status.retriesSoFar >= maxRetries) {
return RetryDecision.giveUp();
} else {
return RetryDecision.delayAndRetry(Duration.zero);
}
});
factory RetryPolicy.constantDelay(Duration delay) =>
RetryPolicy((status) => RetryDecision.delayAndRetry(delay));
factory RetryPolicy.exponentialBackoff(Duration baseDelay) =>
RetryPolicy((status) => RetryDecision.delayAndRetry(
_safeMultiply(baseDelay, BigInt.from(2).pow(status.retriesSoFar))));
/// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
factory RetryPolicy.fullJitter(Duration baseDelay) => RetryPolicy((status) {
final maxDelay =
_safeMultiply(baseDelay, BigInt.from(2).pow(status.retriesSoFar));
return RetryDecision.delayAndRetry(
maxDelay * new Random().nextDouble());
});
static Duration _safeMultiply(Duration duration, BigInt factor) {
final micros = BigInt.from(duration.inMicroseconds) * factor;
return Duration(
microseconds: micros.isValidInt ? micros.toInt() : 9223372036854775807,
);
}
// combinators
RetryPolicy capDelay(Duration maxDelay) =>
mapDelay((d) => d > maxDelay ? maxDelay : d);
RetryPolicy giveUpAfterDelay(Duration cumulativeDelay) =>
RetryPolicy((status) =>
status.previousDelay.getOrElse(() => Duration.zero) >= cumulativeDelay
? RetryDecision.giveUp()
: decideOn(status));
RetryPolicy giveUpAfterCumulativeDelay(Duration cumulativeDelay) =>
RetryPolicy((status) => status.cumulativeDelay >= cumulativeDelay
? RetryDecision.giveUp()
: decideOn(status));
RetryPolicy followedBy(RetryPolicy policy) => RetryPolicy((status) {
final thisDecision = decideOn(status);
return thisDecision.isGivingUp ? thisDecision : policy.decideOn(status);
});
RetryPolicy join(RetryPolicy policy) => RetryPolicy((status) {
final thisDecision = decideOn(status);
final thatDecision = policy.decideOn(status);
if (thisDecision.isGivingUp || thatDecision.isGivingUp) {
return RetryDecision.giveUp();
} else {
return RetryDecision.delayAndRetry(
Duration(
microseconds: max(
thisDecision.delay.inMicroseconds,
thatDecision.delay.inMicroseconds,
),
),
);
}
});
RetryPolicy meet(RetryPolicy policy) => RetryPolicy((status) {
final thisDecision = decideOn(status);
final thatDecision = policy.decideOn(status);
if (!thisDecision.isGivingUp && !thatDecision.isGivingUp) {
return RetryDecision.delayAndRetry(
Duration(
microseconds: min(
thisDecision.delay.inMicroseconds,
thatDecision.delay.inMicroseconds,
),
),
);
} else if (thisDecision.isGivingUp) {
return thatDecision;
} else if (thatDecision.isGivingUp) {
return thisDecision;
} else {
return RetryDecision.giveUp();
}
});
RetryPolicy mapDelay(Function1<Duration, Duration> f) => RetryPolicy(
(status) {
final decision = decideOn(status);
if (decision.isGivingUp) {
return RetryDecision.giveUp();
} else {
return RetryDecision.delayAndRetry(f(decision.delay));
}
},
);
}
class RetryDetails {
final int retriesSoFar;
final Duration cumulativeDelay;
final bool givingUp;
final Option<Duration> upcomingDelay;
RetryDetails(
this.retriesSoFar,
this.cumulativeDelay,
this.givingUp,
this.upcomingDelay,
);
@override
String toString() => 'RetryDetails('
'retriesSoFar = $retriesSoFar, '
'cumulativeDelay = $cumulativeDelay, '
'givingUp = $givingUp, '
'upcomingDelay = $upcomingDelay)';
}
extension RetryOps<A> on Task<A> {
Task<A> retrying(
RetryPolicy policy, {
Function1<A, bool>? wasSuccessful,
Function1<Object, bool>? isWorthRetrying,
Function2<Object, RetryDetails, Task<void>>? onError,
Function2<A, RetryDetails, Task<void>>? onFailure,
}) =>
_retryingImpl(
policy,
wasSuccessful ?? (_) => true,
isWorthRetrying ?? (_) => true,
onError ?? (_, __) => Task.unit,
onFailure ?? (_, __) => Task.unit,
RetryStatus.initial(),
this,
);
Task<A> _retryingImpl(
RetryPolicy policy,
Function1<A, bool> wasSuccessful,
Function1<Object, bool> isWorthRetrying,
Function2<Object, RetryDetails, Task<void>> onError,
Function2<A, RetryDetails, Task<void>> onFailure,
RetryStatus status,
Task<A> action,
) {
return action.attempt().flatMap((result) {
return result.fold(
(err) => isWorthRetrying(err)
? _onFailureOrError(policy, wasSuccessful, isWorthRetrying, onError,
onFailure, status, action, (details) => onError(err, details))
: Task.failed(err.toString()),
(a) => wasSuccessful(a)
? Task.value(a)
: _onFailureOrError(policy, wasSuccessful, isWorthRetrying, onError,
onFailure, status, action, (details) => onFailure(a, details)),
);
});
}
Task<A> _onFailureOrError(
RetryPolicy policy,
Function1<A, bool> wasSuccessful,
Function1<Object, bool> isWorthRetrying,
Function2<Object, RetryDetails, Task<void>> onError,
Function2<A, RetryDetails, Task<void>> onFailure,
RetryStatus status,
Task<A> action,
Function1<RetryDetails, Task<void>> beforeRecurse,
) {
final decision = policy.decideOn(status);
final newStatus = decision._updateStatus(status);
final details = decision._detailsFromStatus(newStatus);
return Task.value(decision.isGivingUp).ifM(
Task.failed('Retry giving up.'),
beforeRecurse(details).andThen(Task.sleep(decision.delay)).andThen(
_retryingImpl(policy, wasSuccessful, isWorthRetrying, onError,
onFailure, newStatus, action)),
);
}
}
@Timeout(Duration(minutes: 2))
import 'package:test/test.dart';
import 'package:dartz/dartz.dart';
void main() {
test('RetryPolicy.limitRetries', () async {
final policy = RetryPolicy.limitRetries(10);
final status0 = RetryStatus(0, Duration.zero, none());
final status10 = RetryStatus(10, Duration.zero, none());
expect(
policy.decideOn(status0), RetryDecision.delayAndRetry(Duration.zero));
expect(policy.decideOn(status10), RetryDecision.giveUp());
});
test('RetryPolicy.constantDelay', () async {
final delay = Duration(seconds: 42);
final policy = RetryPolicy.constantDelay(delay);
final status0 = RetryStatus(0, Duration.zero, none());
final status9 = RetryStatus(9, Duration.zero, none());
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(delay));
expect(policy.decideOn(status9), RetryDecision.delayAndRetry(delay));
});
test('RetryPolicy.exponentialBackoff', () async {
final baseDelay = Duration(seconds: 1);
final policy = RetryPolicy.exponentialBackoff(baseDelay);
final status0 = RetryStatus(0, Duration.zero, none());
final status3 = RetryStatus(3, Duration.zero, none());
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(baseDelay));
expect(policy.decideOn(status3),
RetryDecision.delayAndRetry(Duration(seconds: 8)));
});
test('RetryPolicy.capDelay', () async {
final baseDelay = Duration(seconds: 1);
final maxDelay = Duration(minutes: 1);
final policy = RetryPolicy.exponentialBackoff(baseDelay).capDelay(maxDelay);
final status0 = RetryStatus(0, Duration.zero, none());
final status3 = RetryStatus(3, Duration.zero, none());
final status100 = RetryStatus(100, Duration.zero, none());
expect(policy.decideOn(status0), RetryDecision.delayAndRetry(baseDelay));
expect(policy.decideOn(status3),
RetryDecision.delayAndRetry(Duration(seconds: 8)));
expect(policy.decideOn(status100), RetryDecision.delayAndRetry(maxDelay));
});
test('RetryPolicy.stack safety', () async {
int count = 0;
final logFailure = Task.delay(() {
if (count % 100 == 0) {
print('count: $count');
}
});
final t = Task.delay(() => count++).retrying(
RetryPolicy.limitRetries(10000)
.join(RetryPolicy.constantDelay(Duration(milliseconds: 10))),
wasSuccessful: (count) => count > 9999,
onFailure: (_, __) => logFailure,
);
await t.timeout(Duration(minutes: 2)).run();
expect(count, 10000);
});
test("Task.retrying playground", () async {
var count = 0;
var errorsLogged = 0;
var failuresLogged = 0;
final logError = (err, details) =>
Task.print('[${DateTime.now()}] err: $err / $details')
.andThen(Task.delay(() => errorsLogged++));
final logFailure = (a, details) =>
Task.print('[${DateTime.now()}] fail: $a / $details')
.andThen(Task.delay(() => failuresLogged++));
final retryingTask = Task.delay<int>(
() => count++,
// () => throw FormatException('kaboom'),
).retrying(
RetryPolicy.limitRetries(10)
.join(RetryPolicy.exponentialBackoff(Duration(milliseconds: 5))),
// .join(RetryPolicy.fullJitter(Duration(milliseconds: 500))),
// .capDelay(Duration(milliseconds: 1200)),
wasSuccessful: (a) => a >= 10,
// retryOn: (err) => err is! FormatException,
onError: logError,
onFailure: logFailure,
);
final result =
await retryingTask.timeout(const Duration(seconds: 10)).attempt().run();
// final result = await retryingTask.attempt().run();
result.fold(
(err) {
expect(errorsLogged, 10);
},
(a) {
expect(a, 10);
expect(errorsLogged, 0);
},
);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment