Skip to content

Instantly share code, notes, and snippets.

@purijatin
Last active October 3, 2016 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save purijatin/8912a79bc0a5a403697284153a22bde4 to your computer and use it in GitHub Desktop.
Save purijatin/8912a79bc0a5a403697284153a22bde4 to your computer and use it in GitHub Desktop.
Distributed Lock based on ZooKeeper
package zookeeper
import java.util.concurrent.{ExecutorService, Executors}
import org.apache.zookeeper._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control._
/**
* Created by puri on 9/30/2016.
*/
object Prac2 {
def main(args: Array[String]) {
val pool: ExecutorService = Executors.newCachedThreadPool()
implicit val exec = ExecutionContext.fromExecutor(pool)
val f = Future.sequence(
for (i <- 1 to 59) yield
Future.apply {
val zoo: ZooKeeperBasedLock = new ZooKeeperBasedLock()
zoo.lock()
zoo.unlock()
})
Await.result(f, 60 seconds)
pool.shutdown()
}
}
object ZooKeeperBasedLock {
val logEnabled = false
def LOG(n: => String, priority: Boolean = false) = {
if (logEnabled || priority)
println(Thread.currentThread() + " - " + n)
}
}
class ZooKeeperBasedLock() extends Watcher {
val zoo: ZooKeeper = new ZooKeeper("127.0.0.1", 2181, this)
import ZooKeeperBasedLock._
private var lockKey: String = null
var obj = new Object
def lock() = {
val loop = new Breaks
val name = "lock-"
val root: String = "/1/"
val lock: String = root + name
val created = zoo.create(lock, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)
loop.breakable {
while (true) {
val child = zoo.getChildren("/1", this).asScala.sorted
if (root + child.head == created) {
LOG("Lock obtained")
loop.break()
} else {
LOG("Could not obtain lock. Current: " + created + ". Waiting on: " + child.head)
if (zoo.exists(created, true) != null) {
LOG(s"$created. Waiting on ${child.head}")
if (obj != null) {
obj.synchronized {
obj.wait()
}
}
}
}
}
}
lockKey = created
}
def unlock() = {
if (lockKey == null)
throw new IllegalStateException("Unlock called before calling Lock")
LOG("-----------------------------Over and releasing the lock now: " + lockKey, priority = true)
zoo.delete(lockKey, -1)
}
override def process(event: WatchedEvent): Unit = {
if (obj != null) {
obj.synchronized {
LOG("Notifying All", false)
obj.notifyAll()
obj = null
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment