Skip to content

Instantly share code, notes, and snippets.

@taterbase
Created June 21, 2016 01:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taterbase/c349d8facf40564e7d54f53ec56c6960 to your computer and use it in GitHub Desktop.
Save taterbase/c349d8facf40564e7d54f53ec56c6960 to your computer and use it in GitHub Desktop.
Recursively watch child Zookeeper graph grow
package main
import (
"log"
"path"
"sync"
"time"
"github.com/samuel/go-zookeeper/zk"
)
var (
zk_addrs = []string{"127.0.0.1"}
zk_conn_timeout = time.Second
)
type ZKScanner struct {
conn *zk.Conn
total uint
Event <-chan string
}
func NewZKScanner(conn *zk.Conn) *ZKScanner {
return &ZKScanner{conn: conn}
}
func (s *ZKScanner) watch(ch <-chan zk.Event) {
event := <-ch
if event.Err != nil {
log.Fatal("Child Watch Event Error:", event.Err)
}
if event.Type == zk.EventNodeDeleted {
log.Println("Child Node Removed:", event.Path)
return
}
log.Println("Child changed:", event.Path)
err := s.partialScan(event.Path)
if err != nil {
log.Fatal("Error on partial scan:", err)
}
}
func (s *ZKScanner) scan(prefix, node string) error {
node = path.Join(prefix, node)
children, _, ch, err := s.conn.ChildrenW(node)
if err != nil {
return err
}
go s.watch(ch)
for _, child := range children {
err := s.scan(node, child)
if err != nil {
return err
}
}
return nil
}
func (s *ZKScanner) partialScan(node string) error {
children, parent_stat, ch, err := s.conn.ChildrenW(node)
if err != nil {
return err
}
go s.watch(ch)
for _, child := range children {
_, stat, err := s.conn.Get(path.Join(node, child))
if err != nil {
return err
}
if stat.Czxid == parent_stat.Pzxid {
s.scan(node, child)
}
}
return nil
}
func main() {
c, _, err := zk.Connect(zk_addrs, zk_conn_timeout)
if err != nil {
log.Fatal("Unable to connect:", err)
}
scanner := NewZKScanner(c)
err = scanner.scan("/", "")
if err != nil {
log.Fatal("Unable to scan nodes:", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
wg.Wait()
log.Println("finished")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment