Created
March 26, 2014 00:59
-
-
Save inexplicable/9774903 to your computer and use it in GitHub Desktop.
cluster backed by zk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.squbs.samples.pubsub.cluster | |
import akka.actor._ | |
import com.twitter.zk.{ZNode, ZkClient, RetryPolicy} | |
import com.twitter.util._ | |
import java.util.concurrent.TimeUnit | |
import scala.collection.mutable | |
import com.typesafe.config.ConfigFactory | |
import org.apache.zookeeper.CreateMode | |
import com.google.common.base.Charsets | |
import akka.util.ByteString | |
import scala.Some | |
import com.twitter.zk.RetryPolicy.Exponential | |
import com.twitter.util.Throw | |
import org.apache.zookeeper.Watcher.Event.EventType | |
/** | |
* Created by huzhou on 3/25/14. | |
*/ | |
class ZkCluster(system:ActorSystem, | |
zkConnectionString:String, | |
zkNamespace:String, | |
zkConnectionTimeout:Duration = Duration(1L, TimeUnit.SECONDS), | |
zkSessionTimeout:Duration = Duration(10L, TimeUnit.SECONDS), | |
retryPolicy:RetryPolicy = Exponential(Duration(1L, TimeUnit.SECONDS))) extends Extension { | |
val zkClient = ZkClient(zkConnectionString, Some(zkConnectionTimeout), zkSessionTimeout).withRetryPolicy(retryPolicy) | |
val zkClusterActor = system.actorOf(Props.create(classOf[ZkClusterActor], zkClient, zkNamespace), "zkCluster") | |
val zkClusterAddress = zkClusterActor.path.address | |
//begin the process of electing a leader | |
val namespacePromise = Promise[ZNode]() | |
zkClient(zkNamespace).create(mode = CreateMode.PERSISTENT).respond({ | |
case Return(result) => namespacePromise.setValue(result) | |
case Throw(ex) => zkClient(zkNamespace).exists().flatMap(exists => exists.sync()).foreach(sync => namespacePromise.setValue(sync)) | |
}) | |
val leadershipPromise = Promise[ZNode]() | |
namespacePromise.respond({ | |
case _ => | |
zkClient(s"$zkNamespace/leadership").create(mode = CreateMode.PERSISTENT).respond({ | |
case Return(result) => leadershipPromise.setValue(result) | |
case Throw(ex) => zkClient(s"$zkNamespace/leadership").exists().flatMap(exists => exists.sync()).foreach(sync => leadershipPromise.setValue(sync)) | |
}) | |
}) | |
val partitionPromise = Promise[ZNode]() | |
namespacePromise.respond({ | |
case _ => | |
zkClient(s"$zkNamespace/partition").create(mode = CreateMode.PERSISTENT).respond({ | |
case Return(result) => partitionPromise.setValue(result) | |
case Throw(ex) => zkClient(s"$zkNamespace/partition").exists().flatMap(exists => exists.sync()).foreach(sync => partitionPromise.setValue(sync)) | |
}) | |
}) | |
//try gaining the leadership | |
leadershipPromise.respond({ | |
case _ => { | |
zkClient(s"$zkNamespace/leadership").create(mode = CreateMode.EPHEMERAL_SEQUENTIAL).respond({ | |
case Return(result) => { | |
val me = result.name | |
zkClient(s"$zkNamespace/leadership").getChildren().respond({ | |
case Return(result) => { | |
//verify if i'm the least child among all, and get itself elected as leader | |
//otherwise, monitorAhead the closest child who's smaller than me, i'm in turn to be a leader | |
def monitorAhead:Unit = { | |
val aheadOfMe = result.children.sortWith((one, two) => one.name < two.name).filter(c => c.name < me) | |
aheadOfMe.lastOption match { | |
case None => | |
//i'm the leader! great | |
zkClient(result.parentPath).setData(zkClusterAddress.toString.getBytes(Charsets.UTF_8), result.stat.getVersion) | |
case Some(ahead) => | |
//i'm waiting for `ahead` to pass leadership to me, watch it, in case it's gone, repeat the monitorAhead logic | |
zkClient(ahead.path).exists.watch().respond({ | |
case Return(aheadWatched) => { | |
aheadWatched.result | |
.onFailure(gone => monitorAhead) | |
.onSuccess({ex => | |
aheadWatched.update.respond({ | |
case _ => monitorAhead | |
}) | |
}) | |
} | |
}) | |
} | |
} | |
def identifyLeader:Unit = { | |
def watch = zkClient(result.path).getData.watch().respond({ | |
case Return(leader) => { | |
leader.result.onSuccess(data => { | |
zkClusterActor ! ZkLeaderElected(AddressFromURIString(new String(data.bytes, Charsets.UTF_8))) | |
membership | |
}) | |
identifyLeader | |
} | |
}) | |
zkClient(result.path).getData() | |
.onSuccess(data => { | |
zkClusterActor ! ZkLeaderElected(AddressFromURIString(new String(data.bytes, Charsets.UTF_8))) | |
membership | |
watch | |
}) | |
.onFailure(ex => watch) | |
} | |
monitorAhead | |
identifyLeader | |
} | |
}) | |
def membership = { | |
def watch = zkClient(s"$zkNamespace/leadership").getChildren.watch().respond({ | |
case Return(members) => | |
members.result.onSuccess({result => | |
Future.collect(result.children.map(child => child.getData())).foreach(members => { | |
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(new String(m.bytes, Charsets.UTF_8)))) | |
}) | |
}) | |
}) | |
zkClient(s"$zkNamespace/leadership").getChildren() | |
.onSuccess(result => { | |
Future.collect(result.children.map(child => child.getData())).foreach(members => { | |
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(new String(m.bytes, Charsets.UTF_8)))) | |
watch | |
}) | |
}) | |
.onFailure(ex => watch) | |
} | |
} | |
}) | |
} | |
}) | |
partitionPromise.respond({ | |
case _ => | |
def watch:Unit = zkClient(s"$zkNamespace/partition").getChildren.watch().respond({ | |
case Return(partitions) => | |
partitions.update.respond({ | |
case Return(event) => | |
event.getType match { | |
case EventType.NodeCreated => refresh | |
case EventType.NodeDeleted => refresh | |
case EventType.NodeChildrenChanged => refresh | |
} | |
}) | |
}) | |
def refresh = zkClient(s"$zkNamespace/partition").getChildren() | |
.onSuccess(result => { | |
val snapshot = result.children.map(partition => partition.getData()).map(partitionData => { | |
partitionData.flatMap(result => { | |
val partitionKey = ByteString(result.bytes) | |
partitionData.flatMap(d => d.getChildren()) | |
.flatMap(assigned => Future.collect(assigned.children.map(child => child.getData()).map(childData => childData.map(d => AddressFromURIString(new String(d.bytes, Charsets.UTF_8)))))) | |
.map(members => (partitionKey, members)) | |
}) | |
}) | |
Future.collect(snapshot).onSuccess(assignments => { | |
var partitionsToMembers = Map.empty[ByteString, Seq[Address]] | |
assignments.foreach(assign => partitionsToMembers += assign) | |
zkClusterActor ! ZkPartitionsChanged(partitionsToMembers) | |
}) | |
watch | |
}) | |
.onFailure(ex => watch) | |
refresh | |
}) | |
} | |
private[cluster] sealed trait ZkClusterState | |
private[cluster] case object ZkClusterUninitialized extends ZkClusterState | |
private[cluster] case object ZkClusterActiveAsLeader extends ZkClusterState | |
private[cluster] case object ZkClusterActiveAsFollower extends ZkClusterState | |
private[cluster] case class ZkClusterData(namespace: String, | |
leader: Option[Address], | |
members: Seq[Address], | |
partitionsToMembers: mutable.Map[ByteString, Seq[Address]]) | |
private[cluster] case class ZkLeaderElected(address:Address) | |
private[cluster] case class ZkMembersChanged(members:Seq[Address]) | |
private[cluster] case class ZkPartitionsChanged(partitions:Map[ByteString, Seq[Address]]) | |
private[cluster] case class ZkPartitionAssigned(partitionKey:ByteString, zkPath:String) | |
case object ZkQueryLeadership | |
case class ZkLeadership(address:Address) | |
case object ZkQueryMembership | |
case class ZkMembership(members:Seq[Address]) | |
case class ZkQueryPartitionOnMiss(create:Boolean, size:Int) | |
case class ZkQueryPartition(partitionKey:ByteString, onMiss:Option[ZkQueryPartitionOnMiss] = None) | |
case class ZkPartition(members:Seq[Address], origin:ZkQueryPartition) | |
class ZkClusterActor(zkClient:ZkClient, namespace:String) extends FSM[ZkClusterState, ZkClusterData] { | |
startWith(ZkClusterUninitialized, ZkClusterData(namespace, None, Seq(), mutable.Map.empty)) | |
when(ZkClusterUninitialized){ | |
case Event(ZkLeaderElected(address), zkClusterData) => | |
address match { | |
case self.path.address => | |
goto(ZkClusterActiveAsLeader) using(zkClusterData.copy(leader = Some(address))) | |
case _ => | |
goto(ZkClusterActiveAsFollower) using(zkClusterData.copy(leader = Some(address))) | |
} | |
case Event(ZkMembersChanged(members), zkClusterData) => | |
stay using(zkClusterData.copy(members = members)) | |
case Event(ZkQueryMembership, zkClusterData) => | |
sender ! ZkMembership(zkClusterData.members) | |
stay | |
} | |
when(ZkClusterActiveAsFollower){ | |
case Event(ZkLeaderElected(address), zkClusterData) => | |
address match { | |
case self.path.address => | |
goto(ZkClusterActiveAsLeader) using(zkClusterData.copy(leader = Some(address))) | |
} | |
case Event(ZkMembersChanged(members), zkClusterData) => | |
stay using(zkClusterData.copy(members = members)) | |
case Event(ZkPartitionsChanged(partitions), zkClusterData) => | |
val partitionsToMembers = mutable.HashMap[ByteString, Seq[Address]]() | |
partitions.foreach(pair => partitionsToMembers + pair) | |
stay using(zkClusterData.copy(partitionsToMembers = partitionsToMembers)) | |
case Event(ZkQueryLeadership, zkClusterData) => | |
zkClusterData.leader.foreach(address => sender ! ZkLeadership(address)) | |
stay | |
case Event(ZkQueryMembership, zkClusterData) => | |
sender ! ZkMembership(zkClusterData.members) | |
stay | |
case Event(ZkQueryPartition(key, Some(ZkQueryPartitionOnMiss(true, size))), zkClusterData) => | |
implicit val originalSender = sender | |
zkClusterData.leader.foreach(address => context.actorSelection(ActorPath.fromString(self.path.toStringWithAddress(address))) ! query) | |
stay | |
case Event(origin @ ZkQueryPartition(key, _), zkClusterData) => | |
sender ! ZkPartition(zkClusterData.partitionsToMembers.getOrElse(key, Seq()), origin) | |
stay | |
case Event(ZkPartitionAssigned(partitionKey, zkPath), zkClusterData) => | |
zkClient(s"$namespace/partition/$zkPath").create(self.path.address.toString.getBytes(Charsets.UTF_8), mode = CreateMode.EPHEMERAL_SEQUENTIAL) | |
stay | |
} | |
when(ZkClusterActiveAsLeader){ | |
case Event(ZkLeaderElected(address), zkClusterData) => | |
address match { | |
case self.path.address => | |
stay | |
case _ => | |
goto(ZkClusterActiveAsFollower) using(zkClusterData.copy(leader = Some(address))) | |
} | |
case Event(ZkMembersChanged(members), zkClusterData) => | |
stay using(zkClusterData.copy(members = members)) | |
case Event(ZkPartitionsChanged(partitions), zkClusterData) => | |
val partitionsToMembers = mutable.HashMap[ByteString, Seq[Address]]() | |
partitions.foreach(pair => partitionsToMembers + pair) | |
//TODO partition rebalance as cluster's leader | |
stay using(zkClusterData.copy(partitionsToMembers = partitionsToMembers)) | |
case Event(ZkQueryLeadership, zkClusterData) => | |
zkClusterData.leader.foreach(address => sender ! ZkLeadership(address)) | |
stay | |
case Event(ZkQueryMembership, zkClusterData) => | |
sender ! ZkMembership(zkClusterData.members) | |
stay | |
case Event(query:ZkQueryPartition, zkClusterData) => | |
val members = zkClusterData.partitionsToMembers.getOrElse(query.partitionKey, Seq()) | |
query.onMiss match { | |
case Some(ZkQueryPartitionOnMiss(true, size)) if members.isEmpty => | |
//create partition | |
zkClient(s"$namespace/partition/").create(data = query.partitionKey.toArray, mode = CreateMode.PERSISTENT_SEQUENTIAL).respond({ | |
case Return(created) => { | |
val reversed = mutable.HashMap.empty[Address, Int] | |
zkClusterData.partitionsToMembers.foreach(pair => pair._2.foreach(address => reversed.put(address, reversed.getOrElse(address, 0) + 1))) | |
val leastToMostAssigned = zkClusterData.members.sortWith((one, two) => reversed.getOrElse(one, 0) < reversed.getOrElse(two, 0)) | |
val selection = leastToMostAssigned.take(Math.min(leastToMostAssigned.size, size)) | |
selection.foreach(address => context.actorSelection(ActorPath.fromString(self.path.toStringWithAddress(address))) ! ZkPartitionAssigned(query.partitionKey, created.path)) | |
} | |
}) | |
stay | |
case _ => | |
sender ! ZkPartition(zkClusterData.partitionsToMembers.getOrElse(query.partitionKey, Seq()), query) | |
stay | |
} | |
case Event(ZkPartitionAssigned(partitionKey, zkPath), zkClusterData) => | |
zkClient(s"$namespace/partition/$zkPath").create(self.path.address.toString.getBytes(Charsets.UTF_8), mode = CreateMode.EPHEMERAL_SEQUENTIAL) | |
stay | |
} | |
} | |
object ZkCluster extends ExtensionId[ZkCluster] with ExtensionIdProvider { | |
override def lookup(): ExtensionId[_ <: Extension] = ZkCluster | |
override def createExtension(system: ExtendedActorSystem): ZkCluster = { | |
val configuration = ConfigFactory.load | |
new ZkCluster(system, configuration.getString("zkCluster.connectionString"), configuration.getString("zkCluster.namespace")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment