Skip to content

Instantly share code, notes, and snippets.

@maheshkelkar
Created July 12, 2016 22:38
Show Gist options
  • Save maheshkelkar/75b87741008adf7d912cd9a36e40c862 to your computer and use it in GitHub Desktop.
Save maheshkelkar/75b87741008adf7d912cd9a36e40c862 to your computer and use it in GitHub Desktop.
FailureAccrualFactory.scala changes for https://github.com/twitter/finagle/issues/524
diff --git a/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala b/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
index 50956e2..5c2bcbb 100644
--- a/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
+++ b/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
@@ -1,5 +1,7 @@
package com.twitter.finagle.service
+import java.net.InetSocketAddress
+
import com.twitter.conversions.time._
import com.twitter.finagle.Stack.{Params, Role}
import com.twitter.finagle._
@@ -10,6 +12,7 @@ import com.twitter.finagle.util.DefaultLogger
import com.twitter.logging.Level
import com.twitter.util._
import java.util.logging.Logger
+
import scala.util.Random
object FailureAccrualFactory {
@@ -310,7 +313,11 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
}
}
- private[this] val onServiceAcquisitionFailure: Throwable => Unit = { _ => didFail() }
+ private[this] val onServiceAcquisitionFailure: Throwable => Unit = { _ =>
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: onServiceAcquisitionFailure for "$label"""")
+ stopProbing()
+ didFail()
+ }
protected def isSuccess(reqRep: ReqRep): Boolean =
responseClassifier.applyOrElse(reqRep, ResponseClassifier.Default) match {
@@ -322,6 +329,7 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
// Only count revivals when the probe succeeds.
state match {
case ProbeClosed =>
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: ALIVE for "$label"""")
revivalCounter.incr()
failureAccrualPolicy.revived()
state = Alive
@@ -337,12 +345,13 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
if (state == Alive) removalCounter.incr()
state = Dead
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: DEAD for "$label"""")
val timerTask = timer.schedule(duration.fromNow) { startProbing() }
reviveTimerTask = Some(timerTask)
- logger.log(Level.INFO, s"""FailureAccrualFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString}""")
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString()}""")
removedForCounter.incr(duration.inMilliseconds.toInt)
didMarkDead()
@@ -359,10 +368,33 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
* The service must satisfy one request before accepting more.
*/
protected def startProbing() = svcFacSelf.synchronized {
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: startProbing for "$label"""")
state = ProbeOpen
cancelReviveTimerTask()
}
+ /**
+ * Exit 'Probing' state (if necessary)
+ *
+ * The follow-on operation (i.e. the result of first request while probing) will determine
+ * whether the factory transitions to Alive (successful) or Dead (unsuccessful).
+ */
+ private[this] def stopProbing() = {
+ state match {
+ case ProbeOpen =>
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: stopProbing for "$label"""")
+ probesCounter.incr()
+ svcFacSelf.synchronized {
+ state match {
+ case ProbeOpen =>
+ state = ProbeClosed
+ case _ =>
+ }
+ }
+ case _ =>
+ }
+ }
+
def apply(conn: ClientConnection) = {
underlying(conn).map { service =>
// N.B. the reason we can't simply filter the service factory is so that
@@ -375,18 +407,9 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
// ProbeClosed state. The result of first to complete will determine
// whether the factory transitions to Alive (successful) or Dead
// (unsuccessful).
- state match {
- case ProbeOpen =>
- probesCounter.incr()
- svcFacSelf.synchronized {
- state match {
- case ProbeOpen => state = ProbeClosed
- case _ =>
- }
- }
- case _ =>
- }
+ stopProbing()
+ // Invoke service
service(request).respond { rep =>
if (isSuccess(ReqRep(request, rep))) didSucceed()
else didFail()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment