Created
March 12, 2019 09:51
-
-
Save rpmr/44933c01119bce5adc56a396253d7c41 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.data.ValidatedNel | |
import cats.effect.ExitCase.{Canceled, Error} | |
import cats.effect.concurrent.Ref | |
import cats.effect.implicits._ | |
import cats.effect.{Concurrent, Timer} | |
import cats.implicits._ | |
import scala.concurrent.duration._ | |
/** | |
* A `Tap` adjusts the flow of tasks through | |
* an external service in response to observed | |
* failures in the service, always trying to | |
* maximize flow while attempting to meet the | |
* user-defined upper bound on failures. | |
*/ | |
trait Tap[F[_]] { | |
/** | |
* Sends the task through the tap. The | |
* returned task may fail immediately with a | |
* default error depending on the service | |
* being guarded by the tap. | |
*/ | |
def apply[A](effect: F[A]): F[A] | |
} | |
object Tap { | |
/** | |
* Creates a tap that aims for the specified | |
* maximum error rate, using the specified | |
* function to qualify errors (unqualified | |
* errors are not treated as failures for | |
* purposes of the tap), and the specified | |
* default error used for rejecting tasks | |
* submitted to the tap. | |
*/ | |
def make[F[_]]( | |
errBound: Rate, | |
qualified: Throwable => Boolean, | |
rejected: => Throwable | |
)(implicit F: Concurrent[F], T: Timer[F]): F[Tap[F]] = { | |
val up: Boolean => (State => State, State => State) = update(errBound.errorRatio) | |
val sleep = T.sleep(errBound.period) | |
( | |
Ref[F].of(State(0L, 0L, 0d)), | |
Ref[F].of(Counter(0L, 0d)) | |
) mapN { (state, counter) => | |
val shouldReject = state.get flatMap { state => | |
counter.modify(checkReject(state.rejectRate)) | |
} | |
new Tap[F] { | |
override def apply[A](fa: F[A]): F[A] = | |
F.ifM(shouldReject)( | |
F.raiseError(rejected), | |
fa.guaranteeCase { ec => | |
(ec match { | |
case Error(ex) if qualified(ex) => up(true).some | |
case Canceled => none | |
case _ => up(false).some | |
}) map { | |
case (before, after) => | |
F.bracket(state.update(before))(_ => sleep)(_ => state.update(after)).start.void | |
} getOrElse F.unit | |
} | |
) | |
} | |
} | |
} | |
private def update(maxErrorRate: Double)(failed: Boolean): (State => State, State => State) = { | |
val err = if (failed) 1L else 0L | |
( | |
st => { | |
val newCount = st.count + 1 | |
val newErrors = st.errors + err | |
st.copy( | |
count = newCount, | |
errors = newErrors, | |
/* | |
Simplest rejection rate calculation I could think of. | |
Few ideas for improvement: | |
- additional config parameter for upper bound of rejection rate (currently it is `1d - maxErrorRate`) | |
- add more significance to more recent errors (this would require tracking state not only in | |
whole period but also in last fraction (possibly configurable) of that period) | |
- depending on requirements it might be useful to reject all tasks (set rejection rate to 1d) for some time (not sure about this) | |
*/ | |
rejectRate = (newErrors.toDouble / newCount - maxErrorRate) max 0d | |
) | |
}, | |
st => st.copy(count = st.count - 1, errors = st.errors - err) | |
) | |
} | |
/** | |
* Check if task should be rejected using provided rejection rate. | |
* @param rejectRate current rejection rate in range [0, 1) | |
* @return tuple of updated counter and boolean indicating if task should be rejected | |
*/ | |
private def checkReject(rejectRate: Double)(counter: Counter): (Counter, Boolean) = { | |
val newTotal = (counter.total + 1L) max 0L | |
val newFraction = newTotal * rejectRate + counter.fraction | |
if (newFraction >= 1d) { | |
(Counter(0L, newFraction - 1d), true) | |
} else { | |
(Counter(newTotal, counter.fraction), false) | |
} | |
} | |
/** | |
* Counter used as state to track rejections. | |
*/ | |
final private case class Counter(total: Long, fraction: Double) | |
/** | |
* State used to track number of finished tasks, | |
* number of tasks completed with qualified error and | |
* current rejection rate. | |
*/ | |
final private case class State( | |
count: Long, | |
errors: Long, | |
rejectRate: Double | |
) | |
final case class Rate private (errorRatio: Double, period: FiniteDuration) | |
object Rate { | |
def apply(errorRatio: Double, period: FiniteDuration): ValidatedNel[String, Rate] = | |
if (errorRatio >= 0d && errorRatio < 1d && period > Duration.Zero) { | |
new Rate(errorRatio, period).validNel | |
} else { | |
"Percentage must in [0d, 1d) and period must be positive".invalidNel | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment