Skip to content

Instantly share code, notes, and snippets.

@sirmax
Last active December 25, 2015 21:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sirmax/7043591 to your computer and use it in GitHub Desktop.
Save sirmax/7043591 to your computer and use it in GitHub Desktop.
import com.twitter.concurrent.{Offer, Broker}
import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.util._
object Reeval {
/** Given a function `mkVar => Var[A]` and a time interval create another `Var[A]` that:
* - initially has a value equal to the value of evaluated `mkVar`
* - passes all updates of created var to the observers
* - after the interval passes, detaches itself from the var, evaluates `mkVar` again, and attaches to it.
*
* This may be used with `Var`s that keep the initially evaluated result,
* but their underlying value could actually change.
* For example as of Finagle 6.7 `Resolver.eval(name).bind()` yields a set of addresses
* that won't change even if there is a change to the network, the DNS, etc.
*/
def aVar[T](mkVar: => Var[T])(implicit timer: Timer, reevalInterval: Duration = 5.seconds): Var[T] = new Var[T] {
private[this] val store = Var(mkVar())
private[this] val observersCountBroker = new Broker[Int]
// All vars are guarded by the Offer loop
private[this] var observersCount = 0
private[this] var currentObserver = Closable.nop
private[this] var nextUpdate: Offer[Unit] = Offer.never
private def loop(): Unit = Offer.select(
observersCountBroker.recv map { delta =>
val oldCount = observersCount
observersCount += delta
(oldCount, observersCount) match {
case (0, 1) =>
currentObserver = mkVar.observe(store.update)
nextUpdate = Offer.timeout(reevalInterval)
case (1, 0) =>
currentObserver.close()
nextUpdate = Offer.never
case _ => // no changes needed
}
},
nextUpdate const {
currentObserver.close()
currentObserver = mkVar.observe(store.update)
nextUpdate = Offer.timeout(reevalInterval)
}
).ensure(loop())
protected def observe(depth: Int, callback: T => Unit) = {
observersCountBroker ! 1
val closable = store.observe(callback)
Closable.sequence(closable, Closable make { _ => observersCountBroker ! -1 })
}
{
// Kick off the loop
loop()
}
}
def name(n: Name)(implicit timer: Timer, reevalInterval: Duration = 5.seconds): Name = new Name {
def bind() = aVar(n.bind())
}
}
import org.scalatest.FreeSpec
import com.twitter.conversions.time._
import com.twitter.util.{MockTimer, Time, Var}
class ReevalTest extends FreeSpec {
"reeval while observing" in Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer
var countVar = Var(1)
def currentCountVar = countVar
var observedValue = 0
val observer = Reeval.aVar(currentCountVar)(timer, 1.second).observe(observedValue = _)
// initial value
assert(observedValue === 1)
// update the var value
countVar() = 2
assert(observedValue === 2)
// changed the var reference, but the reeval interval hasn't passes yet
countVar = Var(3)
tc.advance(500.millis)
timer.tick()
assert(observedValue === 2)
// over the reeval interval
tc.advance(1.second)
timer.tick()
assert(observedValue === 3)
// no updates after observer is closed
observer.close()
countVar() === 4
assert(observedValue === 3)
countVar = Var(4)
tc.advance(1.second)
timer.tick()
assert(observedValue === 3)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment