Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created November 1, 2011 01:35
Show Gist options
  • Save rsumbaly/1329607 to your computer and use it in GitHub Desktop.
Save rsumbaly/1329607 to your computer and use it in GitHub Desktop.
package kafka.schemaregistry.util
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.log4j.Logger
import org.apache.zookeeper.CreateMode
import java.util.concurrent.Callable
import org.I0Itec.zkclient.exception.{ZkInterruptedException, ZkException}
trait LockListener {
def lockAcquired
def lockReleased
}
trait ZkOperation {
def execute: Boolean
}
case class ZNodeName(name: String) extends Comparable[ZNodeName] {
private val logger = Logger.getLogger(getClass)
if (name == null)
throw new NullPointerException("Id cannot be null")
var prefix: String = name
var idx: Int = name.lastIndexOf('-')
var sequence: Int = -1
if (idx >= 0) {
prefix = name.substring(0, idx)
try {
sequence = Integer.parseInt(name.substring(idx + 1))
} catch {
case e1: NumberFormatException => {
logger.info("Number format exception for " + idx, e1)
}
case e2: ArrayIndexOutOfBoundsException => {
logger.info("Array out of bounds for " + idx, e2)
}
}
}
override def toString: String = name.toString
override def compareTo(that: ZNodeName): Int = {
var answer: Int = prefix.compareTo(that.prefix)
if (answer == 0) {
val s1: Int = this.sequence
val s2: Int = that.sequence
if (s1 == -1 && s2 == -1)
this.name.compareTo(that.name)
answer = if (s1 == -1) 1 else if (s2 == -1) -1 else s1 - s2
}
answer
}
}
class WriteLock(zkClient: ZkClient, dir: String, lockListener: LockListener) {
class LockZkOperation extends ZkOperation {
override def execute: Boolean = {
do {
if (id == null) {
val sessionId: Long = zkClient.
}
} while (id == null)
return false
}
}
private val logger = Logger.getLogger(getClass)
val zop: LockZkOperation = new LockZkOperation()
var id: String = null
val ownerId: String = null
val closed: AtomicBoolean = new AtomicBoolean(false)
val retryDelayMs: Long = 500L
val retryCount: Int = 10
def close = closed.compareAndSet(false, true)
def isClosed = closed.get
def retryOperation(operation: ZkOperation): Boolean = {
var exception: Exception = null
for (i <- 0 until retryCount) {
try {
return operation.execute
} catch {
case e: ZkException => {
if (exception != null) exception = e
if (i > 0) {
try {
Thread.sleep(i * retryDelayMs)
} catch {
case e1: InterruptedException => logger.debug("Failed to sleep ", e1)
}
}
}
}
}
throw exception
}
def ensurePathExists(path: String) = {
retryOperation(new ZkOperation() {
override def execute() = {
if (!zkClient.exists(path))
zkClient.createPersistent(path, true)
}
})
}
def isOwner = id != null && ownerId != null && id.equals(ownerId)
def unlock = {
synchronized {
if (!isClosed && id != null) {
try {
zkClient.delete(id)
} catch {
case e: ZkInterruptedException => {
logger.warn("Caught interrupted exception ", e)
Thread.currentThread.interrupt
}
case e1 => throw e1
} finally {
if (lockListener != null) {
lockListener.lockReleased
}
id = null
}
}
}
}
def lock: Boolean = {
synchronized({
if (isClosed) false
// Check if path exists
ensurePathExists(dir)
// Run the actual locking operation
retryOperation(zop)
})
}
}
package kafka.schemaregistry.util
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.TreeSet
class ZNodeNameTest extends JUnit3Suite {
@Test
def testOrderWithSamePrefix() {
val names: Array[String] = Array("x-3", "x-5", "x-11", "x-1")
val expected: Array[String] = Array("x-1", "x-3", "x-5", "x-11")
assertOrderedNodeNames(names, expected);
}
@Test
def testOrderWithDifferentPrefixes() {
val names: Array[String] = Array("r-3", "r-2", "r-1", "w-2", "w-1")
val expected: Array[String] = Array("r-1", "r-2", "r-3", "w-1", "w-2")
assertOrderedNodeNames(names, expected);
}
private def assertOrderedNodeNames(names: Array[String], expected: Array[String]) {
import scala.collection.JavaConversions._
val size: Int = names.length;
assertEquals("The two arrays should be the same size!", names.length, expected.length);
var nodeNames: TreeSet[ZNodeName] = new TreeSet[ZNodeName]()
names.foreach(name => {
nodeNames.add(new ZNodeName(name))
})
var index: Int = 0;
val iter: Iterator[ZNodeName] = nodeNames.iterator
while (iter.hasNext) {
val nodeName: ZNodeName = iter.next
assertEquals("Node " + index, expected(index), nodeName.name)
index = index + 1
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment