Skip to content

Instantly share code, notes, and snippets.

@rpmr
Created March 12, 2019 09:51
Show Gist options
  • Save rpmr/44933c01119bce5adc56a396253d7c41 to your computer and use it in GitHub Desktop.
Save rpmr/44933c01119bce5adc56a396253d7c41 to your computer and use it in GitHub Desktop.
import cats.data.ValidatedNel
import cats.effect.ExitCase.{Canceled, Error}
import cats.effect.concurrent.Ref
import cats.effect.implicits._
import cats.effect.{Concurrent, Timer}
import cats.implicits._
import scala.concurrent.duration._
/**
* A `Tap` adjusts the flow of tasks through
* an external service in response to observed
* failures in the service, always trying to
* maximize flow while attempting to meet the
* user-defined upper bound on failures.
*/
trait Tap[F[_]] {
/**
* Sends the task through the tap. The
* returned task may fail immediately with a
* default error depending on the service
* being guarded by the tap.
*/
def apply[A](effect: F[A]): F[A]
}
object Tap {
/**
* Creates a tap that aims for the specified
* maximum error rate, using the specified
* function to qualify errors (unqualified
* errors are not treated as failures for
* purposes of the tap), and the specified
* default error used for rejecting tasks
* submitted to the tap.
*/
def make[F[_]](
errBound: Rate,
qualified: Throwable => Boolean,
rejected: => Throwable
)(implicit F: Concurrent[F], T: Timer[F]): F[Tap[F]] = {
val up: Boolean => (State => State, State => State) = update(errBound.errorRatio)
val sleep = T.sleep(errBound.period)
(
Ref[F].of(State(0L, 0L, 0d)),
Ref[F].of(Counter(0L, 0d))
) mapN { (state, counter) =>
val shouldReject = state.get flatMap { state =>
counter.modify(checkReject(state.rejectRate))
}
new Tap[F] {
override def apply[A](fa: F[A]): F[A] =
F.ifM(shouldReject)(
F.raiseError(rejected),
fa.guaranteeCase { ec =>
(ec match {
case Error(ex) if qualified(ex) => up(true).some
case Canceled => none
case _ => up(false).some
}) map {
case (before, after) =>
F.bracket(state.update(before))(_ => sleep)(_ => state.update(after)).start.void
} getOrElse F.unit
}
)
}
}
}
private def update(maxErrorRate: Double)(failed: Boolean): (State => State, State => State) = {
val err = if (failed) 1L else 0L
(
st => {
val newCount = st.count + 1
val newErrors = st.errors + err
st.copy(
count = newCount,
errors = newErrors,
/*
Simplest rejection rate calculation I could think of.
Few ideas for improvement:
- additional config parameter for upper bound of rejection rate (currently it is `1d - maxErrorRate`)
- add more significance to more recent errors (this would require tracking state not only in
whole period but also in last fraction (possibly configurable) of that period)
- depending on requirements it might be useful to reject all tasks (set rejection rate to 1d) for some time (not sure about this)
*/
rejectRate = (newErrors.toDouble / newCount - maxErrorRate) max 0d
)
},
st => st.copy(count = st.count - 1, errors = st.errors - err)
)
}
/**
* Check if task should be rejected using provided rejection rate.
* @param rejectRate current rejection rate in range [0, 1)
* @return tuple of updated counter and boolean indicating if task should be rejected
*/
private def checkReject(rejectRate: Double)(counter: Counter): (Counter, Boolean) = {
val newTotal = (counter.total + 1L) max 0L
val newFraction = newTotal * rejectRate + counter.fraction
if (newFraction >= 1d) {
(Counter(0L, newFraction - 1d), true)
} else {
(Counter(newTotal, counter.fraction), false)
}
}
/**
* Counter used as state to track rejections.
*/
final private case class Counter(total: Long, fraction: Double)
/**
* State used to track number of finished tasks,
* number of tasks completed with qualified error and
* current rejection rate.
*/
final private case class State(
count: Long,
errors: Long,
rejectRate: Double
)
final case class Rate private (errorRatio: Double, period: FiniteDuration)
object Rate {
def apply(errorRatio: Double, period: FiniteDuration): ValidatedNel[String, Rate] =
if (errorRatio >= 0d && errorRatio < 1d && period > Duration.Zero) {
new Rate(errorRatio, period).validNel
} else {
"Percentage must in [0d, 1d) and period must be positive".invalidNel
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment