Skip to content

Instantly share code, notes, and snippets.

@arosien
Created November 26, 2012 19:48
Show Gist options
  • Save arosien/4150216 to your computer and use it in GitHub Desktop.
Save arosien/4150216 to your computer and use it in GitHub Desktop.
ymmv scala zk dsl for associating domain models with zk paths and serialization formats
case class WithZk(path: String, client: ZooKeeperClient) {
import WithZk._
import scala.util.control.Exception._
lazy val dir = path.split('/').reverse.tail.reverse.mkString("/")
private val catcher = catching(classOf[KeeperException])
def reader[A: Reads] = new Reader[A]
def writer[A: Writes](value: A) = new Writer[A](value)
def deleter = new Deleter
def get[A: Reads]: R[A] = reader[A].get
def getChildren[A: Reads, B]: R[Seq[Path[B]]] = reader[A].getChildren[B]
def toOption[A: Reads]: Option[A] = reader[A].toOption
def getOrElse[A: Reads](a: => A): A = reader[A].getOrElse(a)
def watch[A: Reads](updated: Option[R[A]] => Unit) = reader[A].watch(updated)
def watchChildren[A: Reads](updated: A => Unit, removed: A => Unit) = reader[A].watchChildren(updated, removed)
def createDir = catcher.either(client.createPath(dir))
def create[A: Writes](value: A, mode: CreateMode) = writer[A](value).create(mode)
def set[A: Writes](value: A) = writer[A](value).set
def createAndSet[A: Writes](value: A, mode: CreateMode) = writer[A](value).createAndSet(mode)
def delete = deleter.delete()
type R[A] = Either[Throwable, A]
class Reader[A: Reads] {
def parent = WithZk(dir, client)
def get: R[A] = for {
data <- catcher.either(client.get(path))
r <- implicitly[Reads[A]].reads(data)
} yield r
def getChildren[B]: R[Seq[Path[B]]] = {
catcher.either(client.getChildren(path))
.map(s => s.map(childPathSuffix => Path[B](path + "/" + childPathSuffix)))
}
def toOption: Option[A] = get.right.toOption
def getOrElse[B >: A](b: => B): B = get.right.toOption.getOrElse(b)
def watch(updated: Option[R[A]] => Unit) = client.watchNode(path, { data: Option[Array[Byte]] =>
updated(data.map(implicitly[Reads[A]].reads(_)))
})
def watchChildren[B: Reads](updated: B => Unit, removed: B => Unit) = {
import scala.collection.mutable
val children = mutable.Map[String, B]()
val updates = children.empty
def notifier(childPath: String) {
children.synchronized {
children.get(childPath).fold(
update => { updates(childPath) = update; updated(update) },
removed(updates.remove(childPath).get))
}
}
def f(b: Array[Byte]): B = implicitly[Reads[B]].reads(b).fold(e => throw e, identity)
client.watchChildrenWithData[B](path, children, f(_), notifier(_))
}
}
class Writer[A: Writes](value: A) {
def parent = WithZk(dir, client)
def createDir = WithZk.this.createDir
def create(mode: CreateMode) = catcher.either(client.create(path, implicitly[Writes[A]].writes(value), mode))
def set = catcher.either(client.set(path, implicitly[Writes[A]].writes(value)))
/**
* Sets value, creating path if it does not exist
*/
def createAndSet(mode: CreateMode): R[String] = {
for {
dirCreated <- createDir
noNode <- set.map(_ => path).left
created <- create(mode)
} yield path
}
}
class Deleter {
def delete(): R[Unit] = catcher.either(client.delete(path))
}
}
/**
* Yet another ZooKeeper DSL:
* {{{
* import WithZk._
*
* val client: ZooKeeperClient
*
* // Read a String from a known path:
* client.atPath("/foo").reader[String].get
*
* // Write a String to a known path:
* client.atPath("/foo").writer("bar").createAndSet(CreateMode.PERSISTENT)
*
* // If an object of type A can be implicitly converted to a Path[A] then you can:
*
* case class Foo(bar: String, baz: String)
* implicit def fooToPath(foo: Foo): Path[Foo] = Path("/foo/%s".format(foo.bar))
*
* val foo = Foo("bar", "baz")
* client.atPathFor(foo).writer(foo.baz).set // Zk node /foo/bar = baz
* client.atPathFor(foo).reader[Foo].get // Foo("bar", "baz")
* }}}
*/
object WithZk {
case class Path[+A](value: String)
trait Reads[A] {
def reads(in: Array[Byte]): Either[Throwable, A]
}
trait Writes[A] {
def writes(value: A): Array[Byte]
}
trait Format[A] extends Reads[A] with Writes[A]
implicit object StringFormat extends Format[String] {
def reads(in: Array[Byte]): Either[Throwable, String] = utf8Encoder.decode(in).right
def writes(value: String): Array[Byte] = utf8Encoder.encode(value)
}
def withZk(path: String, client: ZooKeeperClient) = WithZk(path, client)
/*
* implicit def meh[A <% B](a: A): C means
* if A can be implicitly converted to a B, then convert B to a C.
*/
case class AtPath(client: ZooKeeperClient) {
def atPath(path: String) = withZk(path, client)
def atPath[A](path: Path[A]) = withZk(path.value, client)
def atPathFor[A <% Path[A]](path: A) = withZk(path.value, client)
}
implicit def clientToWithZk(client: ZooKeeperClient) = AtPath(client)
def pathOf[A <% Path[A]](value: A): Path[A] = value
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment