Skip to content

Instantly share code, notes, and snippets.

@inexplicable
Last active August 29, 2015 13:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inexplicable/10207301 to your computer and use it in GitHub Desktop.
Save inexplicable/10207301 to your computer and use it in GitHub Desktop.
zookeeper based akka cluster management
import com.typesafe.sbt.SbtMultiJvm
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
name := "akka-zk-cluster"
version := "1.0"
resolvers += "spray repo" at "http://repo.spray.io"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.1",
"com.typesafe.akka" %% "akka-remote" % "2.3.1",
"com.typesafe.akka" %% "akka-testkit" % "2.3.1",
"com.typesafe.akka" %% "akka-multi-node-testkit" % "2.3.1",
"org.scalatest" %% "scalatest" % "2.1.0",
//"io.spray" % "spray-can" % "1.3.1",
//"io.spray" % "spray-routing" % "1.3.1",
//"io.spray" % "spray-testkit" % "1.3.1",
//"net.databinder.dispatch" %% "dispatch-core" % "0.11.0",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.3.2",
"org.apache.curator" % "curator-recipes" % "2.4.1",
"org.apache.curator" % "curator-framework" % "2.4.1",
"org.apache.curator" % "curator-client" % "2.4.1" exclude("org.jboss.netty", "netty")
)
SbtMultiJvm.multiJvmSettings
compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test)
package org.squbs.cluster
import java.io.File
import java.nio.ByteBuffer
import com.google.common.base.Charsets
import java.net.{NetworkInterface, URLDecoder, URLEncoder, InetAddress}
import org.apache.zookeeper.KeeperException.{NoNodeException, NodeExistsException}
import org.apache.zookeeper.{WatchedEvent, CreateMode}
import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.framework.api._
import scala.Some
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import akka.actor._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.slf4j.Logging
/**
* Created by huzhou on 3/25/14.
*/
class ZkCluster(system: ActorSystem,
val zkAddress: Address,
zkConnectionString: String,
zkNamespace: String,
retryPolicy: RetryPolicy = new ExponentialBackoffRetry(1000, 3),
rebalanceLogic:RebalanceLogic = DefaultDataCenterAwareRebalanceLogic) extends Extension with Logging {
import ZkCluster._
private[this] var zkClient = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
newState match {
case ConnectionState.LOST =>
logger.error("[zkCluster] connection lost!")
zkClient = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy)
zkClient.getConnectionStateListenable.addListener(this)
zkClient.start
zkClusterActor ! ZkClientUpdated(zkClientWithNs)
zkMembershipMonitor ! ZkClientUpdated(zkClientWithNs)
zkPartitionsManager ! ZkClientUpdated(zkClientWithNs)
case _ =>
}
}
})
zkClient.start
//this is the zk client that we'll use, using the namespace reserved throughout
implicit def zkClientWithNs = zkClient.usingNamespace(zkNamespace)
//make sure /leader, /members, /partitions, /sizes znodes are available
guarantee("/leader", Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/members", Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/partitions",Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/sizes", Some(Array[Byte]()), CreateMode.PERSISTENT)
//all interactions with the zk cluster extension should be through the zkClusterActor below
val zkClusterActor = system.actorOf(Props.create(classOf[ZkClusterActor], zkClientWithNs, zkAddress, rebalanceLogic), "zkCluster")
//begin the process of electing a leader
private[cluster] val zkMembershipMonitor = system.actorOf(
Props(classOf[ZkMembershipMonitor], zkClientWithNs, zkClusterActor, zkAddress, new LeaderLatch(zkClientWithNs, "/leadership")).withDispatcher("pinned-dispatcher"), "zkMembership")
//begin the process of partitioning management
private[cluster] val zkPartitionsManager = system.actorOf(
Props(classOf[ZkPartitionsManager], zkClientWithNs, zkClusterActor, zkAddress), "zkPartitions")
}
private[cluster] sealed trait ZkClusterState
private[cluster] case object ZkClusterUninitialized extends ZkClusterState
private[cluster] case object ZkClusterActiveAsLeader extends ZkClusterState
private[cluster] case object ZkClusterActiveAsFollower extends ZkClusterState
private[cluster] case class ZkClusterData(leader: Option[Address],
members: Set[Address],
partitionsToMembers: Map[ByteString, Set[Address]])
private[cluster] case class ZkLeaderElected(address: Option[Address])
private[cluster] case class ZkMembersChanged(members: Set[Address])
private[cluster] case class ZkRebalance(partitionsToMembers: Map[ByteString, Set[Address]])
private[cluster] case object ZkAcquireLeadership
case object ZkQueryLeadership
case object ZkQueryMembership
case object ZkMonitorClient
case class ZkClientUpdated(zkClient:CuratorFramework)
case class ZkLeadership(address: Address)
case class ZkMembership(members: Set[Address])
case class ZkQueryPartition(partitionKey:ByteString, //partition key
notification:Option[Any] = None, //notify the sender() along with query result
createOnMiss:Option[Int] = None, //create partition when it's missing or not, and the size in case it's to be created
props:Array[Byte] = Array[Byte](), //properties of the partition, plain byte array
members:Set[Address] = Set.empty) //used internally
case class ZkRemovePartition(partitionKey:ByteString)
case class ZkMonitorPartition(whenOnboards:Set[ActorPath] = Set.empty, //notify me when some partition is assigned @see ZkPartitionOnboard
whenDropoffs:Set[ActorPath] = Set.empty, //notify me when some partition is released @see ZkPartitionDropoff
whenChangeds:Set[ActorPath] = Set.empty) //notify me when partitions have changes @see ZkPartitionsChanged
case class ZkStopMonitorPartition(whenOnboards:Set[ActorPath] = Set.empty, //stop notify me when some partition is assigned @see ZkPartitionOnboard
whenDropoffs:Set[ActorPath] = Set.empty, //stop notify me when some partition is released @see ZkPartitionDropoff
whenChangeds:Set[ActorPath] = Set.empty) //stop notify me when partitions have changes @see ZkPartitionsChanged
case class ZkPartition(partitionKey:ByteString,
members: Seq[Address], //who have been assigned to be part of this partition
zkPath:String, //where the partition data is stored
notification:Option[Any])//optional notification when the query was issued
case class ZkPartitionsChanged(partitions: Map[ByteString, Set[Address]])
case class ZkPartitionOnboard(partitionKey: ByteString, zkPath: String)
case class ZkPartitionDropoff(partitionKey: ByteString, zkPath: String)
/**
* the membership monitor has a few responsibilities, most importantly to enroll the leadership competition and get membership, leadership information immediately after change
* @param zkClient
* @param zkClusterActor
* @param zkAddress
* @param zkLeaderLatch
*/
private[cluster] class ZkMembershipMonitor(implicit var zkClient: CuratorFramework,
zkClusterActor: ActorRef,
zkAddress: Address,
var zkLeaderLatch: LeaderLatch) extends Actor with Logging {
import ZkCluster._
override def preStart = {
//enroll in the leadership competition
zkLeaderLatch.start
//watch over leader changes
val leader = zkClient.getData.usingWatcher(new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
logger.info("[membership] leader watch event:{}", event)
event.getType match {
case EventType.NodeCreated | EventType.NodeDataChanged =>
zkClusterActor ! ZkLeaderElected(zkClient.getData.usingWatcher(this).forPath("/leader"))
case _ =>
}
}
}).forPath("/leader")
//watch over members changes
val me = guarantee(s"/members/${keyToPath(zkAddress.toString)}", Some(Array[Byte]()), CreateMode.EPHEMERAL)
zkClient.sync.forPath(me)
lazy val members = zkClient.getChildren.usingWatcher(new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
logger.info("[membership] membership watch event:{}", event)
event.getType match {
case EventType.NodeChildrenChanged =>
refresh(zkClient.getChildren.usingWatcher(this).forPath("/members"))
case _ =>
}
}
}).forPath("/members")
def refresh(members:Seq[String]) = {
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(pathToKey(m))).toSet)
}
refresh(members)
self ! ZkAcquireLeadership
zkClusterActor ! ZkLeaderElected(leader)
}
override def postStop = {
//stop the leader latch to quit the competition
zkLeaderLatch.close
}
def receive: Actor.Receive = {
case ZkClientUpdated(updated) =>
zkClient = updated
zkLeaderLatch.close
zkLeaderLatch = new LeaderLatch(zkClient, "/leadership")
zkLeaderLatch.start
case ZkAcquireLeadership =>
//repeatedly enroll in the leadership competition once the last attempt fails
import scala.concurrent.ExecutionContext.Implicits.global
val oneSecond = 1.second
zkLeaderLatch.await(oneSecond.length, oneSecond.unit) match {
case true =>
logger.info("[membership] leadership acquired @ {}", zkAddress)
guarantee("/leader", Some(zkAddress))
case false =>
context.system.scheduler.scheduleOnce(100.millis, self, ZkAcquireLeadership)
}
}
}
/**
* The major responsibility of ZkPartitionsManager is to maintain partitions
* @param zkClient
* @param zkClusterActor
* @param zkAddress
*/
private[cluster] class ZkPartitionsManager(implicit var zkClient: CuratorFramework,
zkClusterActor: ActorRef,
zkAddress: Address) extends Actor with Logging {
import ZkCluster._
var partitionsToMembers = Map.empty[ByteString, Set[Address]]
var notifyWhenOnboards = Set.empty[ActorPath]
var notifyWhenDropoffs = Set.empty[ActorPath]
var notifyWhenChangeds = Set.empty[ActorPath]
override def preStart = {
//watch over changes of creation/removal of any partition (watcher over /partitions)
lazy val watcher: CuratorWatcher = new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
event.getType match {
case EventType.NodeChildrenChanged =>
self ! ZkPartitionsChanged(refresh(zkClient.getChildren.usingWatcher(this).forPath("/partitions"), partitionWatcher))
case _ =>
}
}
}
//watch over changes of members of a partition (watcher over /partitions/some-partition)
lazy val partitionWatcher: CuratorWatcher = new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
event.getType match {
case EventType.NodeChildrenChanged =>
self ! ZkPartitionsChanged(refresh(zkClient.getChildren.forPath("/partitions"), this))
case _ =>
}
}
}
def refresh(partitions: Seq[String], partitionWatcher:CuratorWatcher): Map[ByteString, Set[Address]] = {
partitions.map(partitionZNode => {
ByteString(pathToKey(partitionZNode)) -> (try {
zkClient.getChildren.usingWatcher(partitionWatcher).forPath(s"/partitions/$partitionZNode")
.map(memberZNode => AddressFromURIString(pathToKey(memberZNode))).toSet
//the member data stored at znode is implicitly converted to Option[Address] which says where the member is in Akka
}
catch{
case _:NoNodeException => null
case t:Throwable => logger.error("partitions refresh failed due to unknown reason: {}", t); null
})
}).filterNot(_._2 == null).toMap
}
//initialize with the current set of partitions
lazy val partitions = zkClient.getChildren.usingWatcher(watcher).forPath("/partitions")
//initialize partitionsToMembers immediately
self ! ZkPartitionsChanged(refresh(partitions, partitionWatcher))
}
def receive: Actor.Receive = {
case ZkClientUpdated(updated) =>
zkClient = updated
case origin @ ZkPartitionsChanged(change) => //partition changes found in zk
logger.info("[partitions] partitions change detected from zk: {}", change.map(pair => keyToPath(pair._1) -> pair._2))
val onboards = change.keySet.diff(partitionsToMembers.keySet)
val dropoffs = partitionsToMembers.keySet.diff(change.keySet)
//drop off members no longer in the partition
partitionsToMembers = partitionsToMembers.filterKeys(!dropoffs.contains(_))
//update members of partitions already there
partitionsToMembers = partitionsToMembers.map(assign => assign._1 -> (if(change.getOrElse(assign._1, Set.empty).nonEmpty) change(assign._1) else assign._2))
//onboard members of new partitions
partitionsToMembers = partitionsToMembers ++ onboards.map(assign => assign -> change(assign))
logger.info("[partitions] partitions change consolidated: {}", partitionsToMembers.map(pair => keyToPath(pair._1) -> pair._2))
notifyWhenChangeds.foreach(listener => context.actorSelection(listener) ! origin)
case ZkQueryPartition(key, notification, _, _, _) =>
logger.info("[partitions] partition: {} identified", keyToPath(key))
//notification is the attachment part of the partition query, it will allow callback styled message handling at the sender()
sender() ! ZkPartition(key, orderByAge(key, partitionsToMembers.getOrElse(key, Set.empty)), s"/partitions/${keyToPath(key)}", notification)
case ZkRebalance(planned) =>
logger.info("[partitions] rebalance partitions based on plan:{}", planned)
def addressee(address:Address) =
if(address == zkAddress)
context.actorSelection(self.path)
else
context.actorSelection(self.path.toStringWithAddress(address))
planned.foreach(assign => {
val partitionKey = assign._1
val servants = partitionsToMembers.getOrElse(partitionKey, Set.empty)
val onboards = assign._2.diff(servants)
val dropoffs = servants.diff(assign._2)
logger.info("[partitions] onboards:{} and dropoffs:{}", onboards, dropoffs)
onboards.foreach(it => addressee(it) ! ZkPartitionOnboard(partitionKey, s"/partitions/${keyToPath(partitionKey)}"))
dropoffs.foreach(it => addressee(it) ! ZkPartitionDropoff(partitionKey, s"/partitions/${keyToPath(partitionKey)}"))
})
case ZkRemovePartition(key) =>
val partition = keyToPath(key)
safelyDiscard(s"/partitions/$partition")
sender() ! ZkPartition(key, Seq.empty, s"/partitions/${keyToPath(key)}", None)
case ZkMonitorPartition(whenOnboards, whenDropoffs, whenChanged) =>
logger.info("[partitions] monitor partitioning from:{}", sender().path)
notifyWhenOnboards = notifyWhenOnboards ++ whenOnboards
notifyWhenDropoffs = notifyWhenDropoffs ++ whenDropoffs
notifyWhenChangeds = notifyWhenChangeds ++ whenChanged
case ZkStopMonitorPartition(stopWhenOnboards, stopWhenDropoffs, stopWhenChangeds) =>
logger.info("[partitions] stop monitor partitioning from:{}", sender().path)
notifyWhenOnboards = notifyWhenChangeds -- stopWhenOnboards
notifyWhenDropoffs = notifyWhenDropoffs -- stopWhenDropoffs
notifyWhenChangeds = notifyWhenChangeds -- stopWhenChangeds
case origin @ ZkPartitionOnboard(partitionKey, zkPath) => //partition assignment handling
logger.info("[partitions] assignment:{} with zkPath:{}", keyToPath(partitionKey), zkPath)
guarantee(zkPath, None)
//mark acceptance
guarantee(s"$zkPath/${keyToPath(zkAddress.toString)}", Some(Array[Byte]()), CreateMode.EPHEMERAL)
notifyWhenOnboards.foreach(listener => context.actorSelection(listener) ! origin)
case origin @ ZkPartitionDropoff(partitionKey, zkPath) =>
logger.info("[partitions] release:{} with zkPath:{}", keyToPath(partitionKey), zkPath)
safelyDiscard(s"$zkPath/${keyToPath(zkAddress.toString)}")
notifyWhenDropoffs.foreach(listener => context.actorSelection(listener) ! origin)
}
}
trait RebalanceLogic {
/**
* @return partitionsToMembers compensated when size in service is short compared with what's required
*/
def compensate(partitionsToMembers:Map[ByteString, Set[Address]], members:Seq[Address], size:(ByteString => Int)):Map[ByteString, Set[Address]] = {
partitionsToMembers.map(assign => {
val partitionKey = assign._1
val servants = assign._2
val requires = size(partitionKey)//bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}"))
if(servants.size < requires)
partitionKey -> (servants ++ members.filterNot(servants.contains(_)).take(requires - servants.size))
else
assign
})
}
/**
* @return partitionsToMembers rebalanced
*/
def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = {
val utilization = partitionsToMembers.foldLeft(Map.empty[Address, Seq[ByteString]]){(memoize, assign) =>
assign._2.foldLeft(memoize){(memoize, member) =>
memoize.updated(member, memoize.getOrElse(member, Seq.empty) :+ assign._1)
}
}
val ordered = members.toSeq.sortWith((one, two) => utilization.getOrElse(one, Seq.empty).size < utilization.getOrElse(two, Seq.empty).size)
@tailrec def rebalanceRecursively(partitionsToMembers:Map[ByteString, Set[Address]],
utilization:Map[Address, Seq[ByteString]],
ordered:Seq[Address]):Map[ByteString, Set[Address]] = {
val overflows = utilization.getOrElse(ordered.last, Seq.empty)
val underflow = utilization.getOrElse(ordered.head, Seq.empty)
if (overflows.size - underflow.size > 1) {
val move = overflows.head
val updatedUtil = utilization.updated(ordered.last, overflows.tail).updated(ordered.head, underflow :+ move)
var headOrdered = ordered.tail.takeWhile(next => updatedUtil.getOrElse(ordered.head, Seq.empty).size < updatedUtil.getOrElse(next, Seq.empty).size)
headOrdered = (headOrdered :+ ordered.head) ++ ordered.tail.drop(headOrdered.size)
var rearOrdered = headOrdered.takeWhile(next => updatedUtil.getOrElse(headOrdered.last, Seq.empty).size > updatedUtil.getOrElse(next, Seq.empty).size)
rearOrdered = (rearOrdered :+ headOrdered.last) ++ headOrdered.drop(rearOrdered.size).dropRight(1)/*drop the headOrdered.last*/
rebalanceRecursively(partitionsToMembers.updated(move, partitionsToMembers.getOrElse(move, Set.empty) + ordered.head - ordered.last), updatedUtil, rearOrdered)
}
else
partitionsToMembers
}
rebalanceRecursively(partitionsToMembers, utilization, ordered)
}
}
/**
* The main Actor of ZkCluster
* @param zkClient
* @param zkAddress
*/
class ZkClusterActor(implicit var zkClient: CuratorFramework,
zkAddress:Address,
rebalanceLogic:RebalanceLogic) extends FSM[ZkClusterState, ZkClusterData] with Stash with Logging {
import ZkCluster._
var whenZkClientUpdated = Seq.empty[ActorPath]
def partitionManager = context.actorSelection("../zkPartitions")
def requires(partitionKey:ByteString):Int = bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}"))
def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]) = {
val plan = rebalanceLogic.rebalance(rebalanceLogic.compensate(partitionsToMembers, members.toSeq, requires _), members)
logger.info("[leader] rebalance planned as:{}", plan)
partitionManager ! ZkRebalance(plan)
plan
}
val mandatory:StateFunction = {
case Event(ZkClientUpdated(updated), _) =>
zkClient = updated
whenZkClientUpdated.foreach(context.actorSelection(_) ! updated)
stay
case Event(ZkMonitorClient, _) =>
whenZkClientUpdated = whenZkClientUpdated :+ sender().path
stay
case Event(ZkQueryMembership, zkClusterData) =>
sender() ! ZkMembership(zkClusterData.members)
stay
case Event(origin: ZkMonitorPartition, _) =>
logger.info("[follower/leader] monitor partitioning from:{}", sender().path)
partitionManager forward origin
stay
case Event(origin: ZkStopMonitorPartition, _) =>
logger.info("[follower/leader] stop monitor partitioning from:{}", sender().path)
partitionManager forward origin
stay
}
startWith(ZkClusterUninitialized, ZkClusterData(None, Set.empty,
zkClient.getChildren.forPath("/partitions").map(partitionZNode => ByteString(pathToKey(partitionZNode)) -> Set.empty[Address]).toMap))
when(ZkClusterUninitialized)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
logger.info("[uninitialized] leader elected:{} and my zk address:{}", address, zkAddress)
if(address.hostPort == zkAddress.hostPort)
goto(ZkClusterActiveAsLeader) using zkClusterData.copy(leader = Some(address),
partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, zkClusterData.members))
else
goto(ZkClusterActiveAsFollower) using zkClusterData.copy(leader = Some(address))
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[uninitialized] membership updated:{}", members)
stay using zkClusterData.copy(members = members)
case Event(_, _) =>
stash
stay
})
when(ZkClusterActiveAsFollower)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
if(address.hostPort == zkAddress.hostPort)
goto(ZkClusterActiveAsLeader) using zkClusterData.copy(leader = Some(address),
partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, zkClusterData.members))
else
stay
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[follower] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[follower] membership updated:{}", members)
stay using zkClusterData.copy(members = members)
case Event(ZkPartitionsChanged(partitions), zkClusterData) =>
stay using zkClusterData.copy(partitionsToMembers = partitions)
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[follower] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(origin @ ZkQueryPartition(key, _, Some(size), props, members), zkClusterData) =>
logger.info("[follower] partition query forwarded to leader:{}", zkClusterData.leader)
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward origin
})
stay
case Event(origin @ ZkQueryPartition(partitionKey, notification, None, _, _), zkClusterData) =>
zkClusterData.partitionsToMembers.get(partitionKey) match {
case Some(servants) if servants.nonEmpty => //use the snapshot mapping as long as it's available
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, servants), s"/partitions/${keyToPath(partitionKey)}", notification)
case _ => //local mapping wasn't available yet, have to go to leader for source of truth
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward origin
})
}
stay
case Event(remove:ZkRemovePartition, zkClusterData) =>
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward remove
})
stay
})
when(ZkClusterActiveAsLeader)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
if (address.hostPort == zkAddress.hostPort)
stay
else
goto(ZkClusterActiveAsFollower) using zkClusterData.copy(leader = Some(address))
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[leader] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[leader] membership updated:{}", members)
stay using zkClusterData.copy(members = members, partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, members))
case Event(ZkQueryPartition(partitionKey, notification, Some(requires), props, _), zkClusterData) =>
val keyAsPath = keyToPath(partitionKey)
logger.info("[leader] partition creation:{}", keyAsPath)
val zkPath = guarantee(s"/partitions/${keyAsPath}", Some(props), CreateMode.PERSISTENT)
guarantee(s"/sizes/${keyAsPath}", Some(requires), CreateMode.PERSISTENT)
val plan = rebalance(zkClusterData.partitionsToMembers + (partitionKey -> Set.empty), zkClusterData.members)
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, plan.getOrElse(partitionKey, Set.empty)), zkPath, notification)
stay using zkClusterData.copy(partitionsToMembers = plan)
case Event(ZkQueryPartition(partitionKey, notification, None, _, _), zkClusterData) =>
logger.info("[leader] partition query:{} handled by leader cluster actor", keyToPath(partitionKey))
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, zkClusterData.partitionsToMembers.getOrElse(partitionKey, Set.empty)), s"/partitions/${keyToPath(partitionKey)}", notification)
stay
case Event(remove:ZkRemovePartition, zkClusterData) =>
logger.info("[leader] remove partition:{} forwarded to partition manager", keyToPath(remove.partitionKey))
partitionManager forward remove
stay
})
onTransition {
case ZkClusterUninitialized -> ZkClusterActiveAsFollower =>
//unstash all messages uninitialized state couldn't handle
unstashAll
//as a follower, i have to listen to the ZkPartitionsChanged event, as it's driven by ZkPartitionsManager and i must update my partitionsToMembers snapshot
partitionManager ! ZkMonitorPartition(whenChangeds = Set(self.path))
case ZkClusterUninitialized -> ZkClusterActiveAsLeader =>
//unstash all messages uninitialized state couldn't handle
unstashAll
case ZkClusterActiveAsFollower -> ZkClusterActiveAsLeader =>
//as the leader, i no longer need to handle ZkPartitionsChanged event, as i drive the change instead, ZkPartitionsManager will accept my partitionsToMembers
partitionManager ! ZkStopMonitorPartition(whenChangeds = Set(self.path))
case ZkClusterActiveAsLeader -> ZkClusterActiveAsFollower =>
//as a follower, i have to listen to the ZkPartitionsChanged event, as it's driven by ZkPartitionsManager and i must update my partitionsToMembers snapshot
partitionManager ! ZkMonitorPartition(whenChangeds = Set(self.path))
}
}
object ZkCluster extends ExtensionId[ZkCluster] with ExtensionIdProvider with Logging {
override def lookup(): ExtensionId[_ <: Extension] = ZkCluster
override def createExtension(system: ExtendedActorSystem): ZkCluster = {
val source = new File("squbsconfig", "zkcluster.conf")
logger.info("[zkcluster] reading configuration from:{}", source.getAbsolutePath)
val configuration = ConfigFactory.parseFile(source)
val zkConnectionString = configuration.getString("zkCluster.connectionString")
val zkNamespace = configuration.getString("zkCluster.namespace")
val zkAddress = external(system)
logger.info("[zkcluster] connection to:{} and namespace:{} address:{}", zkConnectionString, zkNamespace, zkAddress)
new ZkCluster(system, zkAddress, zkConnectionString, zkNamespace)
}
object DefaultRebalanceLogic extends RebalanceLogic
def guarantee(path:String, data:Option[Array[Byte]], mode:CreateMode = CreateMode.EPHEMERAL)(implicit zkClient:CuratorFramework):String = {
try{
data match {
case None => zkClient.create.withMode(mode).forPath(path)
case Some(bytes) => zkClient.create.withMode(mode).forPath(path, bytes)
}
}
catch{
case e: NodeExistsException => {
if(data.nonEmpty && data.get.length > 0){
zkClient.setData.forPath(path, data.get)
}
path
}
case e: Throwable => {
logger.info("leader znode creation failed due to %s\n", e)
path
}
}
}
def safelyDiscard(path:String, recursive:Boolean = true)(implicit zkClient:CuratorFramework):String = {
import scala.collection.JavaConversions._
try{
if(recursive)
zkClient.getChildren.forPath(path).foreach(child => safelyDiscard(s"$path/$child", recursive))
zkClient.delete.forPath(path)
path
}
catch{
case e: NoNodeException =>
path
case e: Throwable =>
path
}
}
def orderByAge(partitionKey:ByteString, members:Set[Address])(implicit zkClient:CuratorFramework):Seq[Address] = {
if(members.isEmpty)
Seq.empty[Address]
else {
val zkPath = s"/partitions/${keyToPath(partitionKey)}"
val ages = zkClient.getChildren.forPath(zkPath).map(child =>
AddressFromURIString.parse(pathToKey(child)) -> zkClient.checkExists.forPath(s"$zkPath/$child").getCtime).toMap
//this is to ensure that the partitions query result will always give members in the order of oldest to youngest
//this should make data sync easier, the newly onboard member should always consult with the 1st member in the query result to sync with.
members.toSeq.sortBy(ages.getOrElse(_, 0L))
}
}
def ipv4 = {
val addresses = mutable.Set.empty[String]
val enum = NetworkInterface.getNetworkInterfaces
while (enum.hasMoreElements) {
val addrs = enum.nextElement.getInetAddresses
while (addrs.hasMoreElements) {
addresses += addrs.nextElement.getHostAddress
}
}
val pattern = "\\d+\\.\\d+\\.\\d+\\.\\d+".r
val matched = addresses.filter({
case pattern() => true
case _ => false
})
.filter(_ != "127.0.0.1")
matched.head
}
private[cluster] def myAddress = InetAddress.getLocalHost.getCanonicalHostName match {
case "localhost" => ipv4
case h:String => h
}
private[cluster] def external(system:ExtendedActorSystem):Address = Address("akka.tcp", system.name, ipv4, system.provider.getDefaultAddress.port.getOrElse(8086))
def keyToPath(name:String):String = URLEncoder.encode(name, "utf-8")
def pathToKey(name:String):String = URLDecoder.decode(name, "utf-8")
private[cluster] val BYTES_OF_INT = Integer.SIZE / java.lang.Byte.SIZE
implicit def intToBytes(integer:Int):Array[Byte] = {
val buf = ByteBuffer.allocate(BYTES_OF_INT)
buf.putInt(integer)
buf.rewind
buf.array()
}
implicit def bytesToInt(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getInt
implicit def bytesToUtf8(bytes:Array[Byte]):String = new String(bytes, Charsets.UTF_8)
implicit def byteStringToUtf8(bs:ByteString):String = new String(bs.toArray, Charsets.UTF_8)
implicit def addressToBytes(address:Address):Array[Byte] = {
address.toString.getBytes(Charsets.UTF_8)
}
implicit def bytesToAddress(bytes:Array[Byte]):Option[Address] = {
bytes match {
case null => None
case _ if bytes.length == 0 => None
case _ => {
val uri = new String(bytes, Charsets.UTF_8)
Some(AddressFromURIString(uri))
}
}
}
implicit def bytesToByteString(bytes:Array[Byte]):ByteString = {
ByteString(bytes)
}
}
package org.squbs.cluster
import java.io.File
import java.nio.ByteBuffer
import com.google.common.base.Charsets
import java.net.{NetworkInterface, URLDecoder, URLEncoder, InetAddress}
import org.apache.zookeeper.KeeperException.{NoNodeException, NodeExistsException}
import org.apache.zookeeper.{WatchedEvent, CreateMode}
import org.apache.zookeeper.Watcher.Event.EventType
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
import org.apache.curator.framework.api._
import scala.Some
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import akka.actor._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.slf4j.Logging
/**
* Created by huzhou on 3/25/14.
*/
class ZkCluster(system: ActorSystem,
val zkAddress: Address,
zkConnectionString: String,
zkNamespace: String,
retryPolicy: RetryPolicy = new ExponentialBackoffRetry(1000, 3),
rebalanceLogic:RebalanceLogic = DefaultDataCenterAwareRebalanceLogic) extends Extension with Logging {
import ZkCluster._
private[this] var zkClient = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
newState match {
case ConnectionState.LOST =>
logger.error("[zkCluster] connection lost!")
zkClient = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy)
zkClient.getConnectionStateListenable.addListener(this)
zkClient.start
zkClusterActor ! ZkClientUpdated(zkClientWithNs)
zkMembershipMonitor ! ZkClientUpdated(zkClientWithNs)
zkPartitionsManager ! ZkClientUpdated(zkClientWithNs)
case _ =>
}
}
})
zkClient.start
//this is the zk client that we'll use, using the namespace reserved throughout
implicit def zkClientWithNs = zkClient.usingNamespace(zkNamespace)
//make sure /leader, /members, /partitions, /sizes znodes are available
guarantee("/leader", Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/members", Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/partitions",Some(Array[Byte]()), CreateMode.PERSISTENT)
guarantee("/sizes", Some(Array[Byte]()), CreateMode.PERSISTENT)
//all interactions with the zk cluster extension should be through the zkClusterActor below
val zkClusterActor = system.actorOf(Props.create(classOf[ZkClusterActor], zkClientWithNs, zkAddress, rebalanceLogic), "zkCluster")
//begin the process of electing a leader
private[cluster] val zkMembershipMonitor = system.actorOf(
Props(classOf[ZkMembershipMonitor], zkClientWithNs, zkClusterActor, zkAddress, new LeaderLatch(zkClientWithNs, "/leadership")).withDispatcher("pinned-dispatcher"), "zkMembership")
//begin the process of partitioning management
private[cluster] val zkPartitionsManager = system.actorOf(
Props(classOf[ZkPartitionsManager], zkClientWithNs, zkClusterActor, zkAddress), "zkPartitions")
}
private[cluster] sealed trait ZkClusterState
private[cluster] case object ZkClusterUninitialized extends ZkClusterState
private[cluster] case object ZkClusterActiveAsLeader extends ZkClusterState
private[cluster] case object ZkClusterActiveAsFollower extends ZkClusterState
private[cluster] case class ZkClusterData(leader: Option[Address],
members: Set[Address],
partitionsToMembers: Map[ByteString, Set[Address]])
private[cluster] case class ZkLeaderElected(address: Option[Address])
private[cluster] case class ZkMembersChanged(members: Set[Address])
private[cluster] case class ZkRebalance(partitionsToMembers: Map[ByteString, Set[Address]])
private[cluster] case object ZkAcquireLeadership
case object ZkQueryLeadership
case object ZkQueryMembership
case object ZkMonitorClient
case class ZkClientUpdated(zkClient:CuratorFramework)
case class ZkLeadership(address: Address)
case class ZkMembership(members: Set[Address])
case class ZkQueryPartition(partitionKey:ByteString, //partition key
notification:Option[Any] = None, //notify the sender() along with query result
createOnMiss:Option[Int] = None, //create partition when it's missing or not, and the size in case it's to be created
props:Array[Byte] = Array[Byte](), //properties of the partition, plain byte array
members:Set[Address] = Set.empty) //used internally
case class ZkRemovePartition(partitionKey:ByteString)
case class ZkMonitorPartition(whenOnboards:Set[ActorPath] = Set.empty, //notify me when some partition is assigned @see ZkPartitionOnboard
whenDropoffs:Set[ActorPath] = Set.empty, //notify me when some partition is released @see ZkPartitionDropoff
whenChangeds:Set[ActorPath] = Set.empty) //notify me when partitions have changes @see ZkPartitionsChanged
case class ZkStopMonitorPartition(whenOnboards:Set[ActorPath] = Set.empty, //stop notify me when some partition is assigned @see ZkPartitionOnboard
whenDropoffs:Set[ActorPath] = Set.empty, //stop notify me when some partition is released @see ZkPartitionDropoff
whenChangeds:Set[ActorPath] = Set.empty) //stop notify me when partitions have changes @see ZkPartitionsChanged
case class ZkPartition(partitionKey:ByteString,
members: Seq[Address], //who have been assigned to be part of this partition
zkPath:String, //where the partition data is stored
notification:Option[Any])//optional notification when the query was issued
case class ZkPartitionsChanged(partitions: Map[ByteString, Set[Address]])
case class ZkPartitionOnboard(partitionKey: ByteString, zkPath: String)
case class ZkPartitionDropoff(partitionKey: ByteString, zkPath: String)
/**
* the membership monitor has a few responsibilities, most importantly to enroll the leadership competition and get membership, leadership information immediately after change
* @param zkClient
* @param zkClusterActor
* @param zkAddress
* @param zkLeaderLatch
*/
private[cluster] class ZkMembershipMonitor(implicit var zkClient: CuratorFramework,
zkClusterActor: ActorRef,
zkAddress: Address,
var zkLeaderLatch: LeaderLatch) extends Actor with Logging {
import ZkCluster._
override def preStart = {
//enroll in the leadership competition
zkLeaderLatch.start
//watch over leader changes
val leader = zkClient.getData.usingWatcher(new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
logger.info("[membership] leader watch event:{}", event)
event.getType match {
case EventType.NodeCreated | EventType.NodeDataChanged =>
zkClusterActor ! ZkLeaderElected(zkClient.getData.usingWatcher(this).forPath("/leader"))
case _ =>
}
}
}).forPath("/leader")
//watch over members changes
val me = guarantee(s"/members/${keyToPath(zkAddress.toString)}", Some(Array[Byte]()), CreateMode.EPHEMERAL)
zkClient.sync.forPath(me)
lazy val members = zkClient.getChildren.usingWatcher(new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
logger.info("[membership] membership watch event:{}", event)
event.getType match {
case EventType.NodeChildrenChanged =>
refresh(zkClient.getChildren.usingWatcher(this).forPath("/members"))
case _ =>
}
}
}).forPath("/members")
def refresh(members:Seq[String]) = {
zkClusterActor ! ZkMembersChanged(members.map(m => AddressFromURIString(pathToKey(m))).toSet)
}
refresh(members)
self ! ZkAcquireLeadership
zkClusterActor ! ZkLeaderElected(leader)
}
override def postStop = {
//stop the leader latch to quit the competition
zkLeaderLatch.close
}
def receive: Actor.Receive = {
case ZkClientUpdated(updated) =>
zkClient = updated
zkLeaderLatch.close
zkLeaderLatch = new LeaderLatch(zkClient, "/leadership")
zkLeaderLatch.start
case ZkAcquireLeadership =>
//repeatedly enroll in the leadership competition once the last attempt fails
import scala.concurrent.ExecutionContext.Implicits.global
val oneSecond = 1.second
zkLeaderLatch.await(oneSecond.length, oneSecond.unit) match {
case true =>
logger.info("[membership] leadership acquired @ {}", zkAddress)
guarantee("/leader", Some(zkAddress))
case false =>
context.system.scheduler.scheduleOnce(100.millis, self, ZkAcquireLeadership)
}
}
}
/**
* The major responsibility of ZkPartitionsManager is to maintain partitions
* @param zkClient
* @param zkClusterActor
* @param zkAddress
*/
private[cluster] class ZkPartitionsManager(implicit var zkClient: CuratorFramework,
zkClusterActor: ActorRef,
zkAddress: Address) extends Actor with Logging {
import ZkCluster._
var partitionsToMembers = Map.empty[ByteString, Set[Address]]
var notifyWhenOnboards = Set.empty[ActorPath]
var notifyWhenDropoffs = Set.empty[ActorPath]
var notifyWhenChangeds = Set.empty[ActorPath]
override def preStart = {
//watch over changes of creation/removal of any partition (watcher over /partitions)
lazy val watcher: CuratorWatcher = new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
event.getType match {
case EventType.NodeChildrenChanged =>
self ! ZkPartitionsChanged(refresh(zkClient.getChildren.usingWatcher(this).forPath("/partitions"), partitionWatcher))
case _ =>
}
}
}
//watch over changes of members of a partition (watcher over /partitions/some-partition)
lazy val partitionWatcher: CuratorWatcher = new CuratorWatcher {
override def process(event: WatchedEvent): Unit = {
event.getType match {
case EventType.NodeChildrenChanged =>
self ! ZkPartitionsChanged(refresh(zkClient.getChildren.forPath("/partitions"), this))
case _ =>
}
}
}
def refresh(partitions: Seq[String], partitionWatcher:CuratorWatcher): Map[ByteString, Set[Address]] = {
partitions.map(partitionZNode => {
ByteString(pathToKey(partitionZNode)) -> (try {
zkClient.getChildren.usingWatcher(partitionWatcher).forPath(s"/partitions/$partitionZNode")
.map(memberZNode => AddressFromURIString(pathToKey(memberZNode))).toSet
//the member data stored at znode is implicitly converted to Option[Address] which says where the member is in Akka
}
catch{
case _:NoNodeException => null
case t:Throwable => logger.error("partitions refresh failed due to unknown reason: {}", t); null
})
}).filterNot(_._2 == null).toMap
}
//initialize with the current set of partitions
lazy val partitions = zkClient.getChildren.usingWatcher(watcher).forPath("/partitions")
//initialize partitionsToMembers immediately
self ! ZkPartitionsChanged(refresh(partitions, partitionWatcher))
}
def receive: Actor.Receive = {
case ZkClientUpdated(updated) =>
zkClient = updated
case origin @ ZkPartitionsChanged(change) => //partition changes found in zk
logger.info("[partitions] partitions change detected from zk: {}", change.map(pair => keyToPath(pair._1) -> pair._2))
val onboards = change.keySet.diff(partitionsToMembers.keySet)
val dropoffs = partitionsToMembers.keySet.diff(change.keySet)
//drop off members no longer in the partition
partitionsToMembers = partitionsToMembers.filterKeys(!dropoffs.contains(_))
//update members of partitions already there
partitionsToMembers = partitionsToMembers.map(assign => assign._1 -> (if(change.getOrElse(assign._1, Set.empty).nonEmpty) change(assign._1) else assign._2))
//onboard members of new partitions
partitionsToMembers = partitionsToMembers ++ onboards.map(assign => assign -> change(assign))
logger.info("[partitions] partitions change consolidated: {}", partitionsToMembers.map(pair => keyToPath(pair._1) -> pair._2))
notifyWhenChangeds.foreach(listener => context.actorSelection(listener) ! origin)
case ZkQueryPartition(key, notification, _, _, _) =>
logger.info("[partitions] partition: {} identified", keyToPath(key))
//notification is the attachment part of the partition query, it will allow callback styled message handling at the sender()
sender() ! ZkPartition(key, orderByAge(key, partitionsToMembers.getOrElse(key, Set.empty)), s"/partitions/${keyToPath(key)}", notification)
case ZkRebalance(planned) =>
logger.info("[partitions] rebalance partitions based on plan:{}", planned)
def addressee(address:Address) =
if(address == zkAddress)
context.actorSelection(self.path)
else
context.actorSelection(self.path.toStringWithAddress(address))
planned.foreach(assign => {
val partitionKey = assign._1
val servants = partitionsToMembers.getOrElse(partitionKey, Set.empty)
val onboards = assign._2.diff(servants)
val dropoffs = servants.diff(assign._2)
logger.info("[partitions] onboards:{} and dropoffs:{}", onboards, dropoffs)
onboards.foreach(it => addressee(it) ! ZkPartitionOnboard(partitionKey, s"/partitions/${keyToPath(partitionKey)}"))
dropoffs.foreach(it => addressee(it) ! ZkPartitionDropoff(partitionKey, s"/partitions/${keyToPath(partitionKey)}"))
})
case ZkRemovePartition(key) =>
val partition = keyToPath(key)
safelyDiscard(s"/partitions/$partition")
sender() ! ZkPartition(key, Seq.empty, s"/partitions/${keyToPath(key)}", None)
case ZkMonitorPartition(whenOnboards, whenDropoffs, whenChanged) =>
logger.info("[partitions] monitor partitioning from:{}", sender().path)
notifyWhenOnboards = notifyWhenOnboards ++ whenOnboards
notifyWhenDropoffs = notifyWhenDropoffs ++ whenDropoffs
notifyWhenChangeds = notifyWhenChangeds ++ whenChanged
case ZkStopMonitorPartition(stopWhenOnboards, stopWhenDropoffs, stopWhenChangeds) =>
logger.info("[partitions] stop monitor partitioning from:{}", sender().path)
notifyWhenOnboards = notifyWhenChangeds -- stopWhenOnboards
notifyWhenDropoffs = notifyWhenDropoffs -- stopWhenDropoffs
notifyWhenChangeds = notifyWhenChangeds -- stopWhenChangeds
case origin @ ZkPartitionOnboard(partitionKey, zkPath) => //partition assignment handling
logger.info("[partitions] assignment:{} with zkPath:{}", keyToPath(partitionKey), zkPath)
guarantee(zkPath, None)
//mark acceptance
guarantee(s"$zkPath/${keyToPath(zkAddress.toString)}", Some(Array[Byte]()), CreateMode.EPHEMERAL)
notifyWhenOnboards.foreach(listener => context.actorSelection(listener) ! origin)
case origin @ ZkPartitionDropoff(partitionKey, zkPath) =>
logger.info("[partitions] release:{} with zkPath:{}", keyToPath(partitionKey), zkPath)
safelyDiscard(s"$zkPath/${keyToPath(zkAddress.toString)}")
notifyWhenDropoffs.foreach(listener => context.actorSelection(listener) ! origin)
}
}
trait RebalanceLogic {
/**
* @return partitionsToMembers compensated when size in service is short compared with what's required
*/
def compensate(partitionsToMembers:Map[ByteString, Set[Address]], members:Seq[Address], size:(ByteString => Int)):Map[ByteString, Set[Address]] = {
partitionsToMembers.map(assign => {
val partitionKey = assign._1
val servants = assign._2
val requires = size(partitionKey)//bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}"))
if(servants.size < requires)
partitionKey -> (servants ++ members.filterNot(servants.contains(_)).take(requires - servants.size))
else
assign
})
}
/**
* @return partitionsToMembers rebalanced
*/
def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = {
val utilization = partitionsToMembers.foldLeft(Map.empty[Address, Seq[ByteString]]){(memoize, assign) =>
assign._2.foldLeft(memoize){(memoize, member) =>
memoize.updated(member, memoize.getOrElse(member, Seq.empty) :+ assign._1)
}
}
val ordered = members.toSeq.sortWith((one, two) => utilization.getOrElse(one, Seq.empty).size < utilization.getOrElse(two, Seq.empty).size)
@tailrec def rebalanceRecursively(partitionsToMembers:Map[ByteString, Set[Address]],
utilization:Map[Address, Seq[ByteString]],
ordered:Seq[Address]):Map[ByteString, Set[Address]] = {
val overflows = utilization.getOrElse(ordered.last, Seq.empty)
val underflow = utilization.getOrElse(ordered.head, Seq.empty)
if (overflows.size - underflow.size > 1) {
val move = overflows.head
val updatedUtil = utilization.updated(ordered.last, overflows.tail).updated(ordered.head, underflow :+ move)
var headOrdered = ordered.tail.takeWhile(next => updatedUtil.getOrElse(ordered.head, Seq.empty).size < updatedUtil.getOrElse(next, Seq.empty).size)
headOrdered = (headOrdered :+ ordered.head) ++ ordered.tail.drop(headOrdered.size)
var rearOrdered = headOrdered.takeWhile(next => updatedUtil.getOrElse(headOrdered.last, Seq.empty).size > updatedUtil.getOrElse(next, Seq.empty).size)
rearOrdered = (rearOrdered :+ headOrdered.last) ++ headOrdered.drop(rearOrdered.size).dropRight(1)/*drop the headOrdered.last*/
rebalanceRecursively(partitionsToMembers.updated(move, partitionsToMembers.getOrElse(move, Set.empty) + ordered.head - ordered.last), updatedUtil, rearOrdered)
}
else
partitionsToMembers
}
rebalanceRecursively(partitionsToMembers, utilization, ordered)
}
}
/**
* The main Actor of ZkCluster
* @param zkClient
* @param zkAddress
*/
class ZkClusterActor(implicit var zkClient: CuratorFramework,
zkAddress:Address,
rebalanceLogic:RebalanceLogic) extends FSM[ZkClusterState, ZkClusterData] with Stash with Logging {
import ZkCluster._
var whenZkClientUpdated = Seq.empty[ActorPath]
def partitionManager = context.actorSelection("../zkPartitions")
def requires(partitionKey:ByteString):Int = bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}"))
def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]) = {
val plan = rebalanceLogic.rebalance(rebalanceLogic.compensate(partitionsToMembers, members.toSeq, requires _), members)
logger.info("[leader] rebalance planned as:{}", plan)
partitionManager ! ZkRebalance(plan)
plan
}
val mandatory:StateFunction = {
case Event(ZkClientUpdated(updated), _) =>
zkClient = updated
whenZkClientUpdated.foreach(context.actorSelection(_) ! updated)
stay
case Event(ZkMonitorClient, _) =>
whenZkClientUpdated = whenZkClientUpdated :+ sender().path
stay
case Event(ZkQueryMembership, zkClusterData) =>
sender() ! ZkMembership(zkClusterData.members)
stay
case Event(origin: ZkMonitorPartition, _) =>
logger.info("[follower/leader] monitor partitioning from:{}", sender().path)
partitionManager forward origin
stay
case Event(origin: ZkStopMonitorPartition, _) =>
logger.info("[follower/leader] stop monitor partitioning from:{}", sender().path)
partitionManager forward origin
stay
}
startWith(ZkClusterUninitialized, ZkClusterData(None, Set.empty,
zkClient.getChildren.forPath("/partitions").map(partitionZNode => ByteString(pathToKey(partitionZNode)) -> Set.empty[Address]).toMap))
when(ZkClusterUninitialized)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
logger.info("[uninitialized] leader elected:{} and my zk address:{}", address, zkAddress)
if(address.hostPort == zkAddress.hostPort)
goto(ZkClusterActiveAsLeader) using zkClusterData.copy(leader = Some(address),
partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, zkClusterData.members))
else
goto(ZkClusterActiveAsFollower) using zkClusterData.copy(leader = Some(address))
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[uninitialized] membership updated:{}", members)
stay using zkClusterData.copy(members = members)
case Event(_, _) =>
stash
stay
})
when(ZkClusterActiveAsFollower)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
if(address.hostPort == zkAddress.hostPort)
goto(ZkClusterActiveAsLeader) using zkClusterData.copy(leader = Some(address),
partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, zkClusterData.members))
else
stay
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[follower] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[follower] membership updated:{}", members)
stay using zkClusterData.copy(members = members)
case Event(ZkPartitionsChanged(partitions), zkClusterData) =>
stay using zkClusterData.copy(partitionsToMembers = partitions)
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[follower] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(origin @ ZkQueryPartition(key, _, Some(size), props, members), zkClusterData) =>
logger.info("[follower] partition query forwarded to leader:{}", zkClusterData.leader)
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward origin
})
stay
case Event(origin @ ZkQueryPartition(partitionKey, notification, None, _, _), zkClusterData) =>
zkClusterData.partitionsToMembers.get(partitionKey) match {
case Some(servants) if servants.nonEmpty => //use the snapshot mapping as long as it's available
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, servants), s"/partitions/${keyToPath(partitionKey)}", notification)
case _ => //local mapping wasn't available yet, have to go to leader for source of truth
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward origin
})
}
stay
case Event(remove:ZkRemovePartition, zkClusterData) =>
zkClusterData.leader.foreach(address => {
context.actorSelection(self.path.toStringWithAddress(address)) forward remove
})
stay
})
when(ZkClusterActiveAsLeader)(mandatory orElse {
case Event(ZkLeaderElected(Some(address)), zkClusterData) =>
if (address.hostPort == zkAddress.hostPort)
stay
else
goto(ZkClusterActiveAsFollower) using zkClusterData.copy(leader = Some(address))
case Event(ZkQueryLeadership, zkClusterData) =>
logger.info("[leader] leadership query answered:{} to:{}", zkClusterData.leader, sender().path)
zkClusterData.leader.foreach(address => sender() ! ZkLeadership(address))
stay
case Event(ZkMembersChanged(members), zkClusterData) =>
logger.info("[leader] membership updated:{}", members)
stay using zkClusterData.copy(members = members, partitionsToMembers = rebalance(zkClusterData.partitionsToMembers, members))
case Event(ZkQueryPartition(partitionKey, notification, Some(requires), props, _), zkClusterData) =>
val keyAsPath = keyToPath(partitionKey)
logger.info("[leader] partition creation:{}", keyAsPath)
val zkPath = guarantee(s"/partitions/${keyAsPath}", Some(props), CreateMode.PERSISTENT)
guarantee(s"/sizes/${keyAsPath}", Some(requires), CreateMode.PERSISTENT)
val plan = rebalance(zkClusterData.partitionsToMembers + (partitionKey -> Set.empty), zkClusterData.members)
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, plan.getOrElse(partitionKey, Set.empty)), zkPath, notification)
stay using zkClusterData.copy(partitionsToMembers = plan)
case Event(ZkQueryPartition(partitionKey, notification, None, _, _), zkClusterData) =>
logger.info("[leader] partition query:{} handled by leader cluster actor", keyToPath(partitionKey))
sender() ! ZkPartition(partitionKey, orderByAge(partitionKey, zkClusterData.partitionsToMembers.getOrElse(partitionKey, Set.empty)), s"/partitions/${keyToPath(partitionKey)}", notification)
stay
case Event(remove:ZkRemovePartition, zkClusterData) =>
logger.info("[leader] remove partition:{} forwarded to partition manager", keyToPath(remove.partitionKey))
partitionManager forward remove
stay
})
onTransition {
case ZkClusterUninitialized -> ZkClusterActiveAsFollower =>
//unstash all messages uninitialized state couldn't handle
unstashAll
//as a follower, i have to listen to the ZkPartitionsChanged event, as it's driven by ZkPartitionsManager and i must update my partitionsToMembers snapshot
partitionManager ! ZkMonitorPartition(whenChangeds = Set(self.path))
case ZkClusterUninitialized -> ZkClusterActiveAsLeader =>
//unstash all messages uninitialized state couldn't handle
unstashAll
case ZkClusterActiveAsFollower -> ZkClusterActiveAsLeader =>
//as the leader, i no longer need to handle ZkPartitionsChanged event, as i drive the change instead, ZkPartitionsManager will accept my partitionsToMembers
partitionManager ! ZkStopMonitorPartition(whenChangeds = Set(self.path))
case ZkClusterActiveAsLeader -> ZkClusterActiveAsFollower =>
//as a follower, i have to listen to the ZkPartitionsChanged event, as it's driven by ZkPartitionsManager and i must update my partitionsToMembers snapshot
partitionManager ! ZkMonitorPartition(whenChangeds = Set(self.path))
}
}
object ZkCluster extends ExtensionId[ZkCluster] with ExtensionIdProvider with Logging {
override def lookup(): ExtensionId[_ <: Extension] = ZkCluster
override def createExtension(system: ExtendedActorSystem): ZkCluster = {
val source = new File("squbsconfig", "zkcluster.conf")
logger.info("[zkcluster] reading configuration from:{}", source.getAbsolutePath)
val configuration = ConfigFactory.parseFile(source)
val zkConnectionString = configuration.getString("zkCluster.connectionString")
val zkNamespace = configuration.getString("zkCluster.namespace")
val zkAddress = external(system)
logger.info("[zkcluster] connection to:{} and namespace:{} address:{}", zkConnectionString, zkNamespace, zkAddress)
new ZkCluster(system, zkAddress, zkConnectionString, zkNamespace)
}
object DefaultRebalanceLogic extends RebalanceLogic
def guarantee(path:String, data:Option[Array[Byte]], mode:CreateMode = CreateMode.EPHEMERAL)(implicit zkClient:CuratorFramework):String = {
try{
data match {
case None => zkClient.create.withMode(mode).forPath(path)
case Some(bytes) => zkClient.create.withMode(mode).forPath(path, bytes)
}
}
catch{
case e: NodeExistsException => {
if(data.nonEmpty && data.get.length > 0){
zkClient.setData.forPath(path, data.get)
}
path
}
case e: Throwable => {
logger.info("leader znode creation failed due to %s\n", e)
path
}
}
}
def safelyDiscard(path:String, recursive:Boolean = true)(implicit zkClient:CuratorFramework):String = {
import scala.collection.JavaConversions._
try{
if(recursive)
zkClient.getChildren.forPath(path).foreach(child => safelyDiscard(s"$path/$child", recursive))
zkClient.delete.forPath(path)
path
}
catch{
case e: NoNodeException =>
path
case e: Throwable =>
path
}
}
def orderByAge(partitionKey:ByteString, members:Set[Address])(implicit zkClient:CuratorFramework):Seq[Address] = {
if(members.isEmpty)
Seq.empty[Address]
else {
val zkPath = s"/partitions/${keyToPath(partitionKey)}"
val ages = zkClient.getChildren.forPath(zkPath).map(child =>
AddressFromURIString.parse(pathToKey(child)) -> zkClient.checkExists.forPath(s"$zkPath/$child").getCtime).toMap
//this is to ensure that the partitions query result will always give members in the order of oldest to youngest
//this should make data sync easier, the newly onboard member should always consult with the 1st member in the query result to sync with.
members.toSeq.sortBy(ages.getOrElse(_, 0L))
}
}
def ipv4 = {
val addresses = mutable.Set.empty[String]
val enum = NetworkInterface.getNetworkInterfaces
while (enum.hasMoreElements) {
val addrs = enum.nextElement.getInetAddresses
while (addrs.hasMoreElements) {
addresses += addrs.nextElement.getHostAddress
}
}
val pattern = "\\d+\\.\\d+\\.\\d+\\.\\d+".r
val matched = addresses.filter({
case pattern() => true
case _ => false
})
.filter(_ != "127.0.0.1")
matched.head
}
private[cluster] def myAddress = InetAddress.getLocalHost.getCanonicalHostName match {
case "localhost" => ipv4
case h:String => h
}
private[cluster] def external(system:ExtendedActorSystem):Address = Address("akka.tcp", system.name, ipv4, system.provider.getDefaultAddress.port.getOrElse(8086))
def keyToPath(name:String):String = URLEncoder.encode(name, "utf-8")
def pathToKey(name:String):String = URLDecoder.decode(name, "utf-8")
private[cluster] val BYTES_OF_INT = Integer.SIZE / java.lang.Byte.SIZE
implicit def intToBytes(integer:Int):Array[Byte] = {
val buf = ByteBuffer.allocate(BYTES_OF_INT)
buf.putInt(integer)
buf.rewind
buf.array()
}
implicit def bytesToInt(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getInt
implicit def bytesToUtf8(bytes:Array[Byte]):String = new String(bytes, Charsets.UTF_8)
implicit def byteStringToUtf8(bs:ByteString):String = new String(bs.toArray, Charsets.UTF_8)
implicit def addressToBytes(address:Address):Array[Byte] = {
address.toString.getBytes(Charsets.UTF_8)
}
implicit def bytesToAddress(bytes:Array[Byte]):Option[Address] = {
bytes match {
case null => None
case _ if bytes.length == 0 => None
case _ => {
val uri = new String(bytes, Charsets.UTF_8)
Some(AddressFromURIString(uri))
}
}
}
implicit def bytesToByteString(bytes:Array[Byte]):ByteString = {
ByteString(bytes)
}
}
@inexplicable
Copy link
Author

need zookeeper to run externally, configuration required to get connection string at minimal
need curator framework 2.4.1 as of now.

@inexplicable
Copy link
Author

rebalancing + compensation both made available; added zk connection lost handling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment