Skip to content

Instantly share code, notes, and snippets.

@inexplicable
Created March 26, 2014 00:59
Show Gist options
  • Save inexplicable/9774903 to your computer and use it in GitHub Desktop.
Save inexplicable/9774903 to your computer and use it in GitHub Desktop.
cluster backed by zk
package org.squbs.samples.pubsub.cluster
import akka.actor._
import com.twitter.zk.{ZNode, ZkClient, RetryPolicy}
import com.twitter.util._
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import com.typesafe.config.ConfigFactory
import org.apache.zookeeper.CreateMode
import com.google.common.base.Charsets
import akka.util.ByteString
import scala.Some
import com.twitter.zk.RetryPolicy.Exponential
import com.twitter.util.Throw
import org.apache.zookeeper.Watcher.Event.EventType
/**
* Created by huzhou on 3/25/14.
*/
class ZkCluster(system:ActorSystem,
zkConnectionString:String,
zkNamespace:String,
zkConnectionTimeout:Duration = Duration(1L, TimeUnit.SECONDS),
zkSessionTimeout:Duration = Duration(10L, TimeUnit.SECONDS),
retryPolicy:RetryPolicy = Exponential(Duration(1L, TimeUnit.SECONDS))) extends Extension {
val zkClient = ZkClient(zkConnectionString, Some(zkConnectionTimeout), zkSessionTimeout).withRetryPolicy(retryPolicy)
val zkClusterActor = system.actorOf(Props.create(classOf[ZkClusterActor], zkClient, zkNamespace), "zkCluster")
val zkClusterAddress = zkClusterActor.path.address
//begin the process of electing a leader
val namespacePromise = Promise[ZNode]()
zkClient(zkNamespace).create(mode = CreateMode.PERSISTENT).respond({
case Return(result) => namespacePromise.setValue(result)
case Throw(ex) => zkClient(zkNamespace).exists().flatMap(exists => exists.sync()).foreach(sync => namespacePromise.setValue(sync))
})
val leadershipPromise = Promise[ZNode]()
namespacePromise.respond({
case _ =>
zkClient(s"$zkNamespace/leadership").create(mode = CreateMode.PERSISTENT).respond({
case Return(result) => leadershipPromise.setValue(result)
case Throw(ex) => zkClient(s"$zkNamespace/leadership").exists().flatMap(exists => exists.sync()).foreach(sync => leadershipPromise.setValue(sync))
})
})
val partitionPromise = Promise[ZNode]()
namespacePromise.respond({
case _ =>
zkClient(s"$zkNamespace/partition").create(mode = CreateMode.PERSISTENT).respond({
case Return(result) => partitionPromise.setValue(result)
case Throw(ex) => zkClient(s"$zkNamespace/partition").exists().flatMap(exists => exists.sync()).foreach(sync => partitionPromise.setValue(sync))
})
})
//try gaining the leadership
leadershipPromise.respond({
case _ => {
zkClient(s"$zkNamespace/leadership").create(mode = CreateMode.EPHEMERAL_SEQUENTIAL).respond({
case Return(result) => {
val me = result.name
zkClient(s"$zkNamespace/leadership").getChildren().respond({
case Return(result) => {
//verify if i'm the least child among all, and get itself elected as leader
//otherwise, monitorAhead the closest child who's smaller than me, i'm in turn to be a leader
def monitorAhead:Unit = {
val aheadOfMe = result.children.sortWith((one, two) => one.name < two.name).filter(c => c.name < me)
aheadOfMe.lastOption match {
case None =>
//i'm the leader! great
zkClient(result.parentPath).setData(zkClusterAddress.toString.getBytes(Charsets.UTF_8), result.stat.getVersion)
case Some(ahead) =>
//i'm waiting for `ahead` to pass leadership to me, watch it, in case it's gone, repeat the monitorAhead logic
zkClient(ahead.path).exists.watch().respond({
case Return(aheadWatched) => {
aheadWatched.result
.onFailure(gone => monitorAhead)
.onSuccess({ex =>
aheadWatched.update.respond({
case _ => monitorAhead
})
})
}
})
}
}
def identifyLeader:Unit = {
def watch = zkClient(result.path).getData.watch().respond({
case Return(leader) => {
leader.result.onSuccess(data => {
zkClusterActor ! ZkLeaderElected(AddressFromURIString(new String(data.bytes, Charsets.UTF_8)))
membership
})
identifyLeader
}
})
zkClient(result.path).getData()
.onSuccess(data => {
zkClusterActor ! ZkLeaderElected(AddressFromURIString(new String(data.bytes, Charsets.UTF_8)))
membership
watch
})
.onFailure(ex => watch)
}
monitorAhead
identifyLeader
}
})
def membership = {
def watch = zkClient(s"$zkNamespace/leadership").getChildren.watch().respond({
case Return(members) =>
members.result.onSuccess({result =>
Future.collect(result.children.map(child => child.getData())).foreach(members => {
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(new String(m.bytes, Charsets.UTF_8))))
})
})
})
zkClient(s"$zkNamespace/leadership").getChildren()
.onSuccess(result => {
Future.collect(result.children.map(child => child.getData())).foreach(members => {
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(new String(m.bytes, Charsets.UTF_8))))
watch
})
})
.onFailure(ex => watch)
}
}
})
}
})
partitionPromise.respond({
case _ =>
def watch:Unit = zkClient(s"$zkNamespace/partition").getChildren.watch().respond({
case Return(partitions) =>
partitions.update.respond({
case Return(event) =>
event.getType match {
case EventType.NodeCreated => refresh
case EventType.NodeDeleted => refresh
case EventType.NodeChildrenChanged => refresh
}
})
})
def refresh = zkClient(s"$zkNamespace/partition").getChildren()
.onSuccess(result => {
val snapshot = result.children.map(partition => partition.getData()).map(partitionData => {
partitionData.flatMap(result => {
val partitionKey = ByteString(result.bytes)
partitionData.flatMap(d => d.getChildren())
.flatMap(assigned => Future.collect(assigned.children.map(child => child.getData()).map(childData => childData.map(d => AddressFromURIString(new String(d.bytes, Charsets.UTF_8))))))
.map(members => (partitionKey, members))
})
})
Future.collect(snapshot).onSuccess(assignments => {
var partitionsToMembers = Map.empty[ByteString, Seq[Address]]
assignments.foreach(assign => partitionsToMembers += assign)
zkClusterActor ! ZkPartitionsChanged(partitionsToMembers)
})
watch
})
.onFailure(ex => watch)
refresh
})
}
private[cluster] sealed trait ZkClusterState
private[cluster] case object ZkClusterUninitialized extends ZkClusterState
private[cluster] case object ZkClusterActiveAsLeader extends ZkClusterState
private[cluster] case object ZkClusterActiveAsFollower extends ZkClusterState
private[cluster] case class ZkClusterData(namespace: String,
leader: Option[Address],
members: Seq[Address],
partitionsToMembers: mutable.Map[ByteString, Seq[Address]])
private[cluster] case class ZkLeaderElected(address:Address)
private[cluster] case class ZkMembersChanged(members:Seq[Address])
private[cluster] case class ZkPartitionsChanged(partitions:Map[ByteString, Seq[Address]])
private[cluster] case class ZkPartitionAssigned(partitionKey:ByteString, zkPath:String)
case object ZkQueryLeadership
case class ZkLeadership(address:Address)
case object ZkQueryMembership
case class ZkMembership(members:Seq[Address])
case class ZkQueryPartitionOnMiss(create:Boolean, size:Int)
case class ZkQueryPartition(partitionKey:ByteString, onMiss:Option[ZkQueryPartitionOnMiss] = None)
case class ZkPartition(members:Seq[Address], origin:ZkQueryPartition)
class ZkClusterActor(zkClient:ZkClient, namespace:String) extends FSM[ZkClusterState, ZkClusterData] {
startWith(ZkClusterUninitialized, ZkClusterData(namespace, None, Seq(), mutable.Map.empty))
when(ZkClusterUninitialized){
case Event(ZkLeaderElected(address), zkClusterData) =>
address match {
case self.path.address =>
goto(ZkClusterActiveAsLeader) using(zkClusterData.copy(leader = Some(address)))
case _ =>
goto(ZkClusterActiveAsFollower) using(zkClusterData.copy(leader = Some(address)))
}
case Event(ZkMembersChanged(members), zkClusterData) =>
stay using(zkClusterData.copy(members = members))
case Event(ZkQueryMembership, zkClusterData) =>
sender ! ZkMembership(zkClusterData.members)
stay
}
when(ZkClusterActiveAsFollower){
case Event(ZkLeaderElected(address), zkClusterData) =>
address match {
case self.path.address =>
goto(ZkClusterActiveAsLeader) using(zkClusterData.copy(leader = Some(address)))
}
case Event(ZkMembersChanged(members), zkClusterData) =>
stay using(zkClusterData.copy(members = members))
case Event(ZkPartitionsChanged(partitions), zkClusterData) =>
val partitionsToMembers = mutable.HashMap[ByteString, Seq[Address]]()
partitions.foreach(pair => partitionsToMembers + pair)
stay using(zkClusterData.copy(partitionsToMembers = partitionsToMembers))
case Event(ZkQueryLeadership, zkClusterData) =>
zkClusterData.leader.foreach(address => sender ! ZkLeadership(address))
stay
case Event(ZkQueryMembership, zkClusterData) =>
sender ! ZkMembership(zkClusterData.members)
stay
case Event(ZkQueryPartition(key, Some(ZkQueryPartitionOnMiss(true, size))), zkClusterData) =>
implicit val originalSender = sender
zkClusterData.leader.foreach(address => context.actorSelection(ActorPath.fromString(self.path.toStringWithAddress(address))) ! query)
stay
case Event(origin @ ZkQueryPartition(key, _), zkClusterData) =>
sender ! ZkPartition(zkClusterData.partitionsToMembers.getOrElse(key, Seq()), origin)
stay
case Event(ZkPartitionAssigned(partitionKey, zkPath), zkClusterData) =>
zkClient(s"$namespace/partition/$zkPath").create(self.path.address.toString.getBytes(Charsets.UTF_8), mode = CreateMode.EPHEMERAL_SEQUENTIAL)
stay
}
when(ZkClusterActiveAsLeader){
case Event(ZkLeaderElected(address), zkClusterData) =>
address match {
case self.path.address =>
stay
case _ =>
goto(ZkClusterActiveAsFollower) using(zkClusterData.copy(leader = Some(address)))
}
case Event(ZkMembersChanged(members), zkClusterData) =>
stay using(zkClusterData.copy(members = members))
case Event(ZkPartitionsChanged(partitions), zkClusterData) =>
val partitionsToMembers = mutable.HashMap[ByteString, Seq[Address]]()
partitions.foreach(pair => partitionsToMembers + pair)
//TODO partition rebalance as cluster's leader
stay using(zkClusterData.copy(partitionsToMembers = partitionsToMembers))
case Event(ZkQueryLeadership, zkClusterData) =>
zkClusterData.leader.foreach(address => sender ! ZkLeadership(address))
stay
case Event(ZkQueryMembership, zkClusterData) =>
sender ! ZkMembership(zkClusterData.members)
stay
case Event(query:ZkQueryPartition, zkClusterData) =>
val members = zkClusterData.partitionsToMembers.getOrElse(query.partitionKey, Seq())
query.onMiss match {
case Some(ZkQueryPartitionOnMiss(true, size)) if members.isEmpty =>
//create partition
zkClient(s"$namespace/partition/").create(data = query.partitionKey.toArray, mode = CreateMode.PERSISTENT_SEQUENTIAL).respond({
case Return(created) => {
val reversed = mutable.HashMap.empty[Address, Int]
zkClusterData.partitionsToMembers.foreach(pair => pair._2.foreach(address => reversed.put(address, reversed.getOrElse(address, 0) + 1)))
val leastToMostAssigned = zkClusterData.members.sortWith((one, two) => reversed.getOrElse(one, 0) < reversed.getOrElse(two, 0))
val selection = leastToMostAssigned.take(Math.min(leastToMostAssigned.size, size))
selection.foreach(address => context.actorSelection(ActorPath.fromString(self.path.toStringWithAddress(address))) ! ZkPartitionAssigned(query.partitionKey, created.path))
}
})
stay
case _ =>
sender ! ZkPartition(zkClusterData.partitionsToMembers.getOrElse(query.partitionKey, Seq()), query)
stay
}
case Event(ZkPartitionAssigned(partitionKey, zkPath), zkClusterData) =>
zkClient(s"$namespace/partition/$zkPath").create(self.path.address.toString.getBytes(Charsets.UTF_8), mode = CreateMode.EPHEMERAL_SEQUENTIAL)
stay
}
}
object ZkCluster extends ExtensionId[ZkCluster] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = ZkCluster
override def createExtension(system: ExtendedActorSystem): ZkCluster = {
val configuration = ConfigFactory.load
new ZkCluster(system, configuration.getString("zkCluster.connectionString"), configuration.getString("zkCluster.namespace"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment