Skip to content

Instantly share code, notes, and snippets.

@feliperazeek
Created June 16, 2012 22:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save feliperazeek/2942666 to your computer and use it in GitHub Desktop.
Save feliperazeek/2942666 to your computer and use it in GitHub Desktop.
package com.klout.api.services
import akka.agent.Agent
import akka.dispatch._
import org.joda.time._
import com.klout.playful2.sugar._
import com.klout.playful2.zookeeper._
import com.klout.playful2.actors._
import java.util.{ Map => javaMap }
import java.{ lang => java }
import play.Logger
import com.klout.api.models._
import org.apache.zookeeper.KeeperException.NoNodeException
/**
* This is a service used to manage the state of a MySQL cluster. It watches nodes on ZooKeeper to define the health of each node in the cluster.
*
* @author Felipe Oliveira [@_felipera]
*/
trait MySQLStateServiceComponent {
val mysqlStateService: MySQLStateService
trait MySQLStateService {
/**
* Return a list of currently healthy MySQL nodes
*/
def getHealthyNodes(): List[String]
/**
* Returns the current health state of MySQL nodes
*/
def currentState(): Map[String, Any]
}
}
/**
* This is the implemention of the service interface defined above.
*
* @author Felipe Oliveira [@_felipera]
*/
trait AkkaAgentMySQLStateServiceComponent extends MySQLStateServiceComponent {
override lazy val mysqlStateService: MySQLStateService = {
/**
* Zookeeper base path where we will be watching updates to determine the health of MySQL nodes
*/
lazy val nodeBasePath = config string "mysql.zk.base.path" !
/**
* Zookeeper node path where we'll be watching for override updates so we can force the nodes we want the API to be talking to
*/
lazy val nodeOverridePath = config string "mysql.zk.override.path" !
/**
* Get the initial state of the MySQL cluster
*/
def initialState: MySQLClusterState = {
val nodes: Map[String, MySQLNodeStatus] = MySQLClusterState.allNodes.map {
node => (node, getCurrentNodeStatus(node).getOrElse(MySQLDown))
} toMap
val overrideNodes: Option[List[String]] = try {
Option(Shelves.get(nodeOverridePath).toString.split(",").toList)
} catch {
case error: NoNodeException => None
}
MySQLClusterState(nodes, overrideNodes)
}
/**
* Get the Zookeeper node path for a MySQL node
*/
def getNodePath(node: String) = nodeBasePath + "/" + node
/**
* Get the current health of a node based on what's defined on Zookeeper
*/
def getCurrentNodeStatus(node: String): Option[MySQLNodeStatus] = {
try {
val path = getNodePath(node)
val status = getNodeStatus(Shelves.get(path).toString)
please log "Node Health - Node: " + node + ", Status: " + status
status
} catch {
case error: NoNodeException => Option(MySQLDown)
}
}
/**
* Returns the status of a node based on the Zookeeper node value
*/
def getNodeStatus(status: String): Option[MySQLNodeStatus] = status match {
case "up" => Option(MySQLUp)
case "down" => Option(MySQLDown)
case "degraded" => Option(MySQLDegraded)
case _ => None
}
/**
* Akka agent wrapper for the MySQL cluster state
*/
val agent = Agent(initialState)
/**
* Watch for updates on Zookeeper nodes and trigger updates on the state class
*/
MySQLClusterState.allNodes.foreach {
node =>
Shelves.on(getNodePath(node)) {
case NodeUpdated(Some(value)) => getNodeStatus(value) match {
case Some(status) => {
please warn "MySQL Node Update - Node: " + node + ", Value: " + value + ", Status: " + status
agent send (_ withNodeUpdate (node, status))
}
case _ => please warn "Unknown MySQL Status: " + value + " (Node: " + node + ")"
}
case NodeDeleted => {
please warn "MySQL Node Deleted - Node: " + node
agent send (_ withNodeUpdate (node, MySQLDown))
}
case _ => Unit
}
}
/**
* Watch for override updates so we can force the API to talk to specific nodes
*/
Shelves.on(nodeOverridePath) {
case NodeUpdated(Some(value)) => {
please warn "MySQL Nodes Override: " + value
agent send (_ withOverrideNodes (Option(value)))
}
case NodeDeleted => {
please warn "MySQL Nodes Override Removed!"
agent send (_ withOverrideNodes (None))
}
case _ => Unit
}
new AkkaAgentMySQLStateService(agent)
}
class AkkaAgentMySQLStateService(private val state: Agent[MySQLClusterState]) extends MySQLStateService {
/**
* If there are no nodes available do we have a fallback?
*/
lazy val fallbackEnabled = config boolean "mysql.zk.fallback.enabled" !
/**
* If the fallback is enabled what's the node we should fallback to?
*/
lazy val fallbackNode = config string "mysql.zk.fallback.node" !
/**
* Return a list of currently healthy MySQL nodes
*/
override def getHealthyNodes(): List[String] = {
// Get list of current healthy nodes
val nodes = state().getHealthyNodes
val overrideNodes = state().overrideNodes.getOrElse(List())
please debug "MySQL Cluster State - Nodes: " + nodes + ", Override: " + overrideNodes
// If there are override nodes force to use that
val list = if (overrideNodes.size > 0) {
overrideNodes
} else {
// Ok there are no override nodes so check how many are available and healthy
if (nodes.size > 0) {
// Ok there are available healthy nodes
nodes
} else {
// Ok there are no available healthy nodes so check if there's a local fallback enabled
if (fallbackEnabled) {
// Fallback is enabled, use this default node
please debug "There are no healthy MySQL nodes available, using fallback node: " + fallbackNode
List(fallbackNode)
} else {
// No fallback is enabled, just return an empty list which should probably cause an exception to be thrown
List()
}
}
}
// Log Debug
please debug "MySQL Nodes: " + list
// Return nodes that should be used
list
}
/**
* Returns the current health state of MySQL nodes
*/
override def currentState() = state().currentState
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment