Created
December 8, 2016 14:44
-
-
Save ccw/92da2f5033c0bcf90616d02903901fc6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.finagle.zookeeper | |
import com.twitter.common.zookeeper.ServerSet.EndpointStatus | |
import com.twitter.common.zookeeper.{ServerSet, ServerSetImpl, ZooKeeperClient} | |
import com.twitter.finagle.{Announcer, Announcement} | |
import com.twitter.util.{Future, NonFatal, Promise} | |
import java.net.InetSocketAddress | |
import java.util.concurrent.LinkedBlockingQueue | |
import scala.collection.JavaConverters._ | |
import scala.collection.mutable | |
/** | |
* Instead of the original path based ZkAnnouncer, this implementation is address based. | |
* | |
* The best use case of this is for the announcement of Redis Master-Slave servers. | |
* Since each server might be both the master and or could be the slave after re-election, | |
* this announcer could announce the role changes of the server based on the address of the server. | |
*/ | |
class AddressBasedZkAnnouncer(factory: ZkClientFactory) extends Announcer { | |
self => | |
val scheme = "azk" | |
def this() = this(DefaultZkClientFactory) | |
private[this] case class ServerConf(client: ZooKeeperClient, | |
addr: InetSocketAddress, | |
var status: Option[EndpointStatus] = None) | |
private[this] case class ServerSetConf(client: ZooKeeperClient, | |
path: String, | |
serverSet: ServerSet) | |
private[this] case class Mutation(conf: ServerConf, | |
serverSet: Option[ServerSet] = None, | |
onComplete: Promise[Unit]) | |
private[this] var serverSets = Set.empty[ServerSetConf] | |
private[this] var servers = Set.empty[ServerConf] | |
private[this] val q = new LinkedBlockingQueue[Mutation]() | |
private[this] val em = mutable.Map.empty[String, InetSocketAddress].asJava | |
private[this] val mutator = new Thread("ZkAnnouncer Mutator") { | |
setDaemon(true) | |
start() | |
override def run() { | |
while (true) { | |
val change = q.take() | |
try { | |
val conf = change.conf | |
conf.status foreach { status => | |
// expire this server from previous announced path | |
status.leave() | |
conf.status = None | |
} | |
change.serverSet foreach { ss => | |
// join to newly associated path | |
conf.status = Some(ss.join(conf.addr, em)) | |
} | |
change.onComplete.setDone() | |
} catch { | |
case NonFatal(e) => change.onComplete.setException(e) | |
} | |
} | |
} | |
} | |
private[this] def doChange(conf: ServerConf, serverSet: Option[ServerSet] = None): Future[Unit] = { | |
val onComplete = new Promise[Unit] | |
q.offer(Mutation(conf, serverSet, onComplete)) | |
onComplete | |
} | |
def announce(hosts: String, | |
path: String, | |
shardId: Int, | |
addr: InetSocketAddress, | |
endpoint: Option[String] | |
): Future[Announcement] = { | |
val zkHosts = factory.hostSet(hosts) | |
if (zkHosts.isEmpty) | |
Future.exception(new ZkAnnouncerException("ZK client address \"%s\" resolves to nothing".format(hosts))) | |
else | |
announce(factory.get(zkHosts)._1, path, shardId, addr, endpoint) | |
} | |
def announce(client: ZooKeeperClient, | |
path: String, | |
shardId: Int, | |
addr: InetSocketAddress, | |
endpoint: Option[String] | |
): Future[Announcement] = { | |
val serverConf = servers find { s => s.client == client && s.addr == addr } getOrElse { | |
val conf = ServerConf(client, addr) | |
synchronized { | |
servers += conf | |
} | |
conf | |
} | |
val serverSetConf = serverSets find ( p => p.client == client && p.path == path) getOrElse { | |
val conf = ServerSetConf(client, path, new ServerSetImpl(client, path)) | |
synchronized { | |
serverSets += conf | |
} | |
conf | |
} | |
serverConf.synchronized { | |
doChange(serverConf, Some(serverSetConf.serverSet)) map { _ => | |
new Announcement { | |
def unannounce(): Future[Unit] = { | |
serverConf.synchronized { | |
doChange(serverConf) | |
} | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Requiring the shardId here is an unfortunate artifact of the implementation of ServerSets. For most uses | |
* setting it to 0 is sufficient | |
*/ | |
def announce(ia: InetSocketAddress, addr: String): Future[Announcement] = | |
addr.split("!") match { | |
// rzk!host!/full/path!shardId | |
case Array(hosts, path, shardId) => | |
announce(hosts, path, shardId.toInt, ia, None) | |
// rzk!host!/full/path!shardId!endpoint | |
case Array(hosts, path, shardId, endpoint) => | |
announce(hosts, path, shardId.toInt, ia, Some(endpoint)) | |
case _ => | |
Future.exception(new ZkAnnouncerException("Invalid addr \"%s\"".format(addr))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment