Last active
October 3, 2016 11:22
-
-
Save purijatin/8912a79bc0a5a403697284153a22bde4 to your computer and use it in GitHub Desktop.
Distributed Lock based on ZooKeeper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zookeeper | |
import java.util.concurrent.{ExecutorService, Executors} | |
import org.apache.zookeeper._ | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.util.control._ | |
/** | |
* Created by puri on 9/30/2016. | |
*/ | |
object Prac2 { | |
def main(args: Array[String]) { | |
val pool: ExecutorService = Executors.newCachedThreadPool() | |
implicit val exec = ExecutionContext.fromExecutor(pool) | |
val f = Future.sequence( | |
for (i <- 1 to 59) yield | |
Future.apply { | |
val zoo: ZooKeeperBasedLock = new ZooKeeperBasedLock() | |
zoo.lock() | |
zoo.unlock() | |
}) | |
Await.result(f, 60 seconds) | |
pool.shutdown() | |
} | |
} | |
object ZooKeeperBasedLock { | |
val logEnabled = false | |
def LOG(n: => String, priority: Boolean = false) = { | |
if (logEnabled || priority) | |
println(Thread.currentThread() + " - " + n) | |
} | |
} | |
class ZooKeeperBasedLock() extends Watcher { | |
val zoo: ZooKeeper = new ZooKeeper("127.0.0.1", 2181, this) | |
import ZooKeeperBasedLock._ | |
private var lockKey: String = null | |
var obj = new Object | |
def lock() = { | |
val loop = new Breaks | |
val name = "lock-" | |
val root: String = "/1/" | |
val lock: String = root + name | |
val created = zoo.create(lock, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL) | |
loop.breakable { | |
while (true) { | |
val child = zoo.getChildren("/1", this).asScala.sorted | |
if (root + child.head == created) { | |
LOG("Lock obtained") | |
loop.break() | |
} else { | |
LOG("Could not obtain lock. Current: " + created + ". Waiting on: " + child.head) | |
if (zoo.exists(created, true) != null) { | |
LOG(s"$created. Waiting on ${child.head}") | |
if (obj != null) { | |
obj.synchronized { | |
obj.wait() | |
} | |
} | |
} | |
} | |
} | |
} | |
lockKey = created | |
} | |
def unlock() = { | |
if (lockKey == null) | |
throw new IllegalStateException("Unlock called before calling Lock") | |
LOG("-----------------------------Over and releasing the lock now: " + lockKey, priority = true) | |
zoo.delete(lockKey, -1) | |
} | |
override def process(event: WatchedEvent): Unit = { | |
if (obj != null) { | |
obj.synchronized { | |
LOG("Notifying All", false) | |
obj.notifyAll() | |
obj = null | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment