Skip to content

Instantly share code, notes, and snippets.

@atty303

atty303/ConsulQuery.scala

Last active Aug 15, 2018
Embed
What would you like to do?
Consul Resolver for Finagle
example.finagle.consul.ConsulResolver
package example.finagle.consul
/**
* @see https://github.com/kachayev/finagle-consul/blob/master/src/main/scala/com/twitter/finagle/consul/ConsulQuery.scala
*/
case class ConsulQuery(hosts: String,
name: String,
tags: Set[String],
dc: Option[String],
near: Option[String])
object ConsulQuery {
private case class PathDecomposer(components: Seq[String]) {
val (paramComponents, nameComponents) =
components.partition(_.contains("="))
val params = paramComponents.map {
_.split("=", 2) match {
case Array(key, value) => key -> value
}
}
def get(key: String): Seq[String] =
params.withFilter(_._1 == key).map(_._2)
}
def decodeString(query: String): Option[ConsulQuery] = {
query.stripPrefix("/").split('/') match {
case Array(hosts, paths @ _ *) =>
val d = PathDecomposer(paths)
val tags = d.get("tag").toSet
val dc = d.get("dc").headOption
val near = d.get("near").headOption
d.nameComponents.headOption.map { name =>
ConsulQuery(hosts, name, tags, dc, near)
}
case _ => None
}
}
}
package example.finagle.consul
import java.util.{List => JList}
import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.twitter.finagle._
import com.twitter.finagle.http.Status
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.logging.Logger
import com.twitter.util._
import scala.collection.JavaConverters._
/**
* A finagle Resolver for services registered in the consul catalog.
*
* eg) `consul!host1:8500,host2:8500/dc=dc1/tag=prod/tag=finagle/service-name`
*
* @see https://github.com/kachayev/finagle-consul/blob/master/src/main/scala/com/twitter/finagle/consul/ConsulResolver.scala
*/
class ConsulResolver extends Resolver {
private val log = Logger()
private val jsonReader = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.readerFor(classOf[ServiceLocation])
private val httpClientCache = CacheBuilder
.newBuilder()
.maximumSize(100L)
.build(
new CacheLoader[String, ServiceFactory[http.Request, http.Response]]() {
override def load(
hosts: String): ServiceFactory[http.Request, http.Response] = {
Http.client.newClient(hosts, "consul-resolver")
}
})
private val statsReceiver = DefaultStatsReceiver.scope("consul")
private val successes = statsReceiver.counter("successes")
private val failures = statsReceiver.counter("failures")
private val timer: Timer =
new JavaTimer(isDaemon = true, name = Some("ConsulResolver-JavaTimer"))
override val scheme: String = "consul"
override def bind(arg: String): Var[Addr] = {
log.debug(s"bind: resolve new binding [arg:$arg]")
ConsulQuery
.decodeString(arg)
.map(addrOf)
.getOrElse(Var.value(Addr.Failed(
new ConsulResolverException(s"Invalid address [arg:$arg]"))))
}
/**
* Watch changes in members using [[https://www.consul.io/docs/agent/http.html Blocking Queries]].
*/
private def addrOf(query: ConsulQuery): Var[Addr] =
Var.async(Addr.Pending: Addr) { u: Updatable[Addr] =>
def update(consulIndex: Option[String]): Future[Unit] = {
readCatalog(query, consulIndex).flatMap {
case (addr, nextIndex) =>
u() = addr
val waitF = addr match {
case _: Addr.Bound if nextIndex.isDefined => Future.Done
case _ => Future.sleep(ConsulResolver.retryInterval)(timer)
}
waitF.flatMap { _ =>
update(nextIndex)
}
}
}
val f = update(None).interruptible()
new Closable {
override def close(deadline: Time): Future[Unit] = {
f.raise(new FutureCancelledException)
httpClientCache.invalidate(query.hosts)
f
}
}
}
/**
* Lookup a service catalog from Consul via HTTP API.
*
* @see https://www.consul.io/docs/agent/http/catalog.html#catalog_service
*/
private def readCatalog(
q: ConsulQuery,
consulIndex: Option[String]): Future[(Addr, Option[String])] = {
val req = http
.RequestBuilder()
.url(s"http://${q.hosts}" + catalogUri(q, consulIndex))
.buildGet()
log.debugLazy(s"readCatalog: [q:$q consulIndex:$consulIndex req:$req]")
httpClientCache
.get(q.hosts)
.toService(req)
.map {
case rep if rep.status == Status.Ok =>
rep.withInputStream { content =>
try {
val locs =
jsonReader.readValues[ServiceLocation](content).asScala.toSeq
log.debugLazy(s"readCatalog: ServiceLocation=$locs")
val addrs = locs.map(locationToAddr)
val index = rep.headerMap.get("X-Consul-Index")
log.debugLazy(
s"readCatalog: done [q:$q consulIndex:$consulIndex req:$req index:$index addrs:$addrs]")
// if addrs.isEmpty, returns Addr.Neg instead ?
if (addrs.nonEmpty) {
successes.incr()
(Addr.Bound(addrs: _*), index)
} else {
failures.incr()
log.warning(
s"readCatalog: no hosts available [q:$q consulIndex:$consulIndex req:$req]")
(Addr.Bound(addrs: _*), index)
}
} catch {
case NonFatal(e) =>
failures.incr()
log.warning(e, s"couldn't parse consul response [q:$q]")
(Addr.Failed(
new ConsulResolverException(
s"Couldn't parse response [q:$q]",
e)),
consulIndex)
}
}
case rep =>
failures.incr()
log.debug(s"Failed to query consul [q:$q status:${rep.status}]")
(Addr.Failed(
new ConsulResolverException(
s"Failed to query consul [q:$q status:${rep.status}]")),
consulIndex)
}
.rescue {
case NonFatal(e) =>
failures.incr()
log.debug(e, s"Failed to query consul [q:$q req:$req]")
Future.value(
(Addr.Failed(
new ConsulResolverException(
s"Failed to query consul [q:$q req:$req]",
e)),
consulIndex))
}
}
private def locationToAddr(loc: ServiceLocation): Address = {
val address =
if (loc.serviceAddress.nonEmpty) loc.serviceAddress else loc.address
Address(address, loc.servicePort)
}
private def catalogUri(q: ConsulQuery, consulIndex: Option[String]): String = {
val params: Seq[(String, String)] = Seq(
q.dc.map("dc" -> _).toSeq,
q.tags.map("tag" -> _).toSeq,
q.near.map("near" -> _).toSeq,
consulIndex.map("index" -> _).toSeq,
Seq("wait" -> s"${ConsulResolver.blockingQueryWait.inSeconds}s")
).flatten
http.Request.queryString(s"/v1/catalog/service/${q.name}", params: _*)
}
}
object ConsulResolver {
/**
* Set a maximum duration for the blocking query.
*/
def blockingQueryWait_=(d: Duration): Unit = {
require(d.inSeconds > 0 && d.inSeconds <= 10 * 60)
_blockingQueryWait = d
}
def blockingQueryWait: Duration = _blockingQueryWait
private var _blockingQueryWait: Duration = Duration.parse("5.minutes")
/**
* Set a interval duration when query to consul is failed.
*/
def retryInterval_=(d: Duration): Unit = {
require(d.isFinite)
_retryInterval = d
}
def retryInterval: Duration = _retryInterval
private var _retryInterval: Duration = Duration.parse("5.seconds")
}
class ConsulResolverException(msg: String, cause: Throwable = null)
extends Exception(msg, cause)
/**
* The response of [[https://www.consul.io/docs/agent/http/catalog.html#catalog_service /v1/catalog/service/<service>]].
*/
private[consul] case class ServiceLocation @JsonCreator()(
@JsonProperty("Node") node: String,
@JsonProperty("Address") address: String,
@JsonProperty("ServiceID") serviceId: String,
@JsonProperty("ServiceName") serviceName: String,
@JsonProperty("ServiceTags") serviceTags: JList[String],
@JsonProperty("ServiceAddress") serviceAddress: String,
@JsonProperty("ServicePort") servicePort: Int)
Copyright (c) 2016 CyberAgent, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.