Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A Finagle Cluster that uses DNS, based on ZookeeperServerSetCluster.
import com.twitter.concurrent.Spool
import com.twitter.finagle.builder.Cluster
import com.twitter.finagle.util.DefaultTimer
import com.twitter.logging.Logger
import com.twitter.util.FuturePool
import com.twitter.util.{Duration, Future, JavaTimer, Promise, Return, Time, Timer}
import java.net.InetAddress
import java.net.UnknownHostException
import java.net.{InetSocketAddress, SocketAddress}
import java.security.Security
import scala.collection._
import com.twitter.conversions.time._
/**
* A Cluster of SocketAddresses that are continuously resolved via
* DNS. Comes in handy when accessing hosts with frequently-changing
* addresses (e.g. Akamai/AmazonS3/Google), especially from behind a
* firewall.
*/
class DnsCluster(host: String, port: Int, ttl: Duration, timer: Timer)
extends Cluster[SocketAddress] {
private[this] val log = Logger(this.getClass)
private[this] val underlyingSet = new mutable.HashSet[SocketAddress]
private[this] var changes = new Promise[Spool[Cluster.Change[SocketAddress]]]
// exposed for testing
protected[util] def blockingDnsCall: Set[SocketAddress] = {
InetAddress.getAllByName(host).map { addr =>
new InetSocketAddress(addr, port): SocketAddress
}.toSet
}
// exposed for testing
protected[util] def resolveHost: Future[Set[SocketAddress]] =
FuturePool.unboundedPool { blockingDnsCall } handle {
case ex: UnknownHostException =>
log.error("DNS failed for host %s", host)
Set.empty[SocketAddress]
}
private[this] def updateAddress(newSet: Set[SocketAddress]) = synchronized {
def appendUpdate(update: Cluster.Change[SocketAddress]) = {
val newTail = new Promise[Spool[Cluster.Change[SocketAddress]]]
changes() = Return(update *:: newTail)
changes = newTail
}
if (newSet != underlyingSet) {
log.info("%s resolved as %s", host, newSet.mkString(", "))
}
val added = newSet &~ underlyingSet
val removed = underlyingSet &~ newSet
added foreach { address =>
underlyingSet += address
appendUpdate(Cluster.Add(address))
}
removed foreach { address =>
underlyingSet -= address
appendUpdate(Cluster.Rem(address))
}
}
log.info("starting DNS cluster for %s:%d, ttl %s", host, port, ttl)
private[this] val task = timer.schedule(Time.now, ttl) {
resolveHost onSuccess { newSet =>
updateAddress(newSet)
} onFailure { ex =>
log.error(ex, "failed to resolve %s", host)
}
}
def stop() = task.cancel()
def snap: (Seq[SocketAddress], Future[Spool[Cluster.Change[SocketAddress]]]) =
synchronized {
(underlyingSet.toSeq, changes)
}
}
object DnsCluster {
def apply(host: String, port: Int, ttl: Duration): DnsCluster =
new DnsCluster(host, port, ttl, DefaultTimer.twitter)
// Uses default DNS ttl value
// http://stackoverflow.com/questions/1256556/any-way-to-make-java-honor-the-dns-caching-timeout-ttl
def apply(host: String, port: Int): DnsCluster = {
val ttl = {
val minTtl = 5.seconds
val defaultTtl = 10.seconds
val maxTtl = 1.hour
val property = Option(Security.getProperty("networkaddress.cache.ttl"))
property map (_.toInt.seconds max minTtl min maxTtl) getOrElse (defaultTtl)
}
apply(host, port, ttl)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment