Skip to content

Instantly share code, notes, and snippets.

@mardambey
Created May 10, 2012 16:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mardambey/2654382 to your computer and use it in GitHub Desktop.
Save mardambey/2654382 to your computer and use it in GitHub Desktop.
Reset Kafka offsets in ZooKeeper by deleting the corresponding nodes.
/**
* Notes: This code uses AsyncValue[T], a custom class that uses actors
* to allow concurrent operations on the provided type. It can be replaced
* by an Atomic object from the java.util.concurrent package or something
* that provides similar functionality.
*/
/**
* Resets the offsets for the given group / topic pair.
* This works by deleting the offsets recorded in
* ZooKeeper for that pair.
*/
def resetOffset(groupId:String, topic:String) {
val zkHost = getZkForTopic(topic.toString) // if we can't get it we're going to fail
zkHost match {
case Some(host) => {
val connected = new AsyncValue[Boolean](false)
val zk = new ZooKeeper(host, 10000, new ZkWatcher(connected))
var retry = 10
while(!connected.get() && retry > 0) {
Thread.sleep(100)
retry = retry - 1
}
if (connected.get()) {
getOffsetPaths(groupId, topic, zk).foreach(path => {
try {
log.fine("Deleting path from zk: " + path)
zk.delete(path, -1) // no need for version match hence -1
} catch {
case e:Exception => log.severe("Could not delete path " + path + " from zk")
}
})
} else {
// failed to connect
log.severe("Failed to connect to zookeeper on " + zkHost)
}
zk.close()
}
case _ => {
log.severe("Could not reset offset for group " + groupId + ", topic " + topic + ": no zkHost found")
}
}
}
/**
* Returns a list containing the full paths
* in ZooKeeper of the offsets for the given
* group / topic pair.
*/
def getOffsetPaths(groupId:String, topic:String, zk:ZooKeeper) : List[String] = {
// build path where offsets are stored, no need for zk watch (hence false)
val parent = "/consumers/%s/offsets/%s".format(groupId, topic)
try { zk.getChildren(parent, false).map("%s/%s".format(parent, _)).toList }
catch { case _ => List[String]() }
}
/**
* A very simple ZooKeeper watcher that notifies
* its caller by means of an AsyncValue that the
* connection to ZooKeeper is now ready.
*/
private class ZkWatcher(connected:AsyncValue[Boolean]) extends Watcher {
def process(event:WatchedEvent) {
event.getType() match {
case Event.EventType.None if event.getState() == Event.KeeperState.SyncConnected => {
// We are are being told that the state of the connection has changed to SyncConnected
connected.set(true)
}
case _ =>
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment