Skip to content

Instantly share code, notes, and snippets.

@ericacm
Last active October 11, 2015 23:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericacm/3936160 to your computer and use it in GitHub Desktop.
Save ericacm/3936160 to your computer and use it in GitHub Desktop.
start Zookeeper server
// hosts is in the form host:port,host:port
@Value("${zookeeperService.hosts:localhost}")
@ManagedGetter @BeanProperty
var hosts: String = _
val hostsArr = hosts.split(",")
val serversKeyArr = new Array[String](hostsArr.length)
val serversValArr = new Array[String](hostsArr.length)
for ((server, idx) <- hostsArr.zipWithIndex) {
val (host, port) = if (server.contains(":")) {
val parts = server.split(":")
(parts(0), parts(1).toInt)
} else {
(server, clientPort)
}
hostsArr(idx) = host + ":" + port
serversKeyArr(idx) = "server." + (idx+1)
serversValArr(idx) = host + ":" + (port+1) + ":" + (port+2)
}
val connectString = hostsArr.mkString(",")
def startServer(
hostsArr: Array[String],
serversKeyArr: Array[String],
serversValArr: Array[String]) {
if (dataDir == null || dataDir == "") {
dataDir = "./zookeeper-" + nodeId
}
val dir = new File(dataDir)
val startupProperties = new Properties()
val useQuorum = hostsArr.length > 1
if (useQuorum) {
for ((server, idx) <- hostsArr.zipWithIndex) {
startupProperties.put(serversKeyArr(idx), serversValArr(idx))
if (server == nodeId) {
val myidFile = new File(dir, "myid")
val fw = new FileWriter(myidFile)
fw.write((idx+1).toString + "\n")
fw.close()
}
}
}
startupProperties.put("tickTime", "1000")
startupProperties.put("initLimit", "10")
startupProperties.put("syncLimit", "5")
startupProperties.put("dataDir", dataDir)
startupProperties.put("clientPort", clientPort.toString)
startupProperties.put("skipACL", "true")
startupProperties.put("autopurge.purgeInterval", "24")
val serverStartFunc: Function0[Unit] =
if (useQuorum) { // Replicated Zookeeper
val quorumConfiguration = new QuorumPeerConfig
quorumConfiguration.parseProperties(startupProperties)
quorumPeerMain = new QuorumPeerMain
val startFunc = { () =>
quorumPeerMain.runFromConfig(quorumConfiguration)
}
startFunc
} else { // Standalone Zookeeper
val quorumConfiguration = new QuorumPeerConfig
quorumConfiguration.parseProperties(startupProperties)
val configuration = new ServerConfig
configuration.readFrom(quorumConfiguration)
zkServerMain = new ZooKeeperServerMain
val startFunc = { () =>
zkServerMain.runFromConfig(configuration)
}
startFunc
}
val serverStarted = new Semaphore(0)
var serverStartException: Option[Exception] = None
startThread("startZookeeper") {
try {
log.info("Starting ZooKeeper server. clientPort=" + clientPort + " replicated=" + useQuorum)
serverStartFunc()
serverStarted.release()
} catch {
case ioex: IOException =>
log.error("ZooKeeper Failed - data directory corruption: " + ioex.getMessage, ioex)
serverStartException = Some(ioex)
case ex: Exception =>
log.error("ZooKeeper Failed: " + ex.getMessage, ex)
serverStartException = Some(ex)
}
}
if (!serverStarted.tryAcquire(waitingSec, TimeUnit.SECONDS)) {
serverStartException match {
case Some(ex) => throw new RuntimeException(ex)
case None =>
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment