Skip to content

Instantly share code, notes, and snippets.

@ccw
Created December 8, 2016 14:44
Show Gist options
  • Save ccw/92da2f5033c0bcf90616d02903901fc6 to your computer and use it in GitHub Desktop.
Save ccw/92da2f5033c0bcf90616d02903901fc6 to your computer and use it in GitHub Desktop.
package com.twitter.finagle.zookeeper
import com.twitter.common.zookeeper.ServerSet.EndpointStatus
import com.twitter.common.zookeeper.{ServerSet, ServerSetImpl, ZooKeeperClient}
import com.twitter.finagle.{Announcer, Announcement}
import com.twitter.util.{Future, NonFatal, Promise}
import java.net.InetSocketAddress
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* Instead of the original path based ZkAnnouncer, this implementation is address based.
*
* The best use case of this is for the announcement of Redis Master-Slave servers.
* Since each server might be both the master and or could be the slave after re-election,
* this announcer could announce the role changes of the server based on the address of the server.
*/
class AddressBasedZkAnnouncer(factory: ZkClientFactory) extends Announcer {
self =>
val scheme = "azk"
def this() = this(DefaultZkClientFactory)
private[this] case class ServerConf(client: ZooKeeperClient,
addr: InetSocketAddress,
var status: Option[EndpointStatus] = None)
private[this] case class ServerSetConf(client: ZooKeeperClient,
path: String,
serverSet: ServerSet)
private[this] case class Mutation(conf: ServerConf,
serverSet: Option[ServerSet] = None,
onComplete: Promise[Unit])
private[this] var serverSets = Set.empty[ServerSetConf]
private[this] var servers = Set.empty[ServerConf]
private[this] val q = new LinkedBlockingQueue[Mutation]()
private[this] val em = mutable.Map.empty[String, InetSocketAddress].asJava
private[this] val mutator = new Thread("ZkAnnouncer Mutator") {
setDaemon(true)
start()
override def run() {
while (true) {
val change = q.take()
try {
val conf = change.conf
conf.status foreach { status =>
// expire this server from previous announced path
status.leave()
conf.status = None
}
change.serverSet foreach { ss =>
// join to newly associated path
conf.status = Some(ss.join(conf.addr, em))
}
change.onComplete.setDone()
} catch {
case NonFatal(e) => change.onComplete.setException(e)
}
}
}
}
private[this] def doChange(conf: ServerConf, serverSet: Option[ServerSet] = None): Future[Unit] = {
val onComplete = new Promise[Unit]
q.offer(Mutation(conf, serverSet, onComplete))
onComplete
}
def announce(hosts: String,
path: String,
shardId: Int,
addr: InetSocketAddress,
endpoint: Option[String]
): Future[Announcement] = {
val zkHosts = factory.hostSet(hosts)
if (zkHosts.isEmpty)
Future.exception(new ZkAnnouncerException("ZK client address \"%s\" resolves to nothing".format(hosts)))
else
announce(factory.get(zkHosts)._1, path, shardId, addr, endpoint)
}
def announce(client: ZooKeeperClient,
path: String,
shardId: Int,
addr: InetSocketAddress,
endpoint: Option[String]
): Future[Announcement] = {
val serverConf = servers find { s => s.client == client && s.addr == addr } getOrElse {
val conf = ServerConf(client, addr)
synchronized {
servers += conf
}
conf
}
val serverSetConf = serverSets find ( p => p.client == client && p.path == path) getOrElse {
val conf = ServerSetConf(client, path, new ServerSetImpl(client, path))
synchronized {
serverSets += conf
}
conf
}
serverConf.synchronized {
doChange(serverConf, Some(serverSetConf.serverSet)) map { _ =>
new Announcement {
def unannounce(): Future[Unit] = {
serverConf.synchronized {
doChange(serverConf)
}
}
}
}
}
}
/**
* Requiring the shardId here is an unfortunate artifact of the implementation of ServerSets. For most uses
* setting it to 0 is sufficient
*/
def announce(ia: InetSocketAddress, addr: String): Future[Announcement] =
addr.split("!") match {
// rzk!host!/full/path!shardId
case Array(hosts, path, shardId) =>
announce(hosts, path, shardId.toInt, ia, None)
// rzk!host!/full/path!shardId!endpoint
case Array(hosts, path, shardId, endpoint) =>
announce(hosts, path, shardId.toInt, ia, Some(endpoint))
case _ =>
Future.exception(new ZkAnnouncerException("Invalid addr \"%s\"".format(addr)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment