Skip to content

Instantly share code, notes, and snippets.

@codemartial
Last active February 27, 2017 08:52
Show Gist options
  • Save codemartial/46ea092dd7dbc101969f015a361490c4 to your computer and use it in GitHub Desktop.
Save codemartial/46ea092dd7dbc101969f015a361490c4 to your computer and use it in GitHub Desktop.
A basic checkpointing watcher for etcd.
package main
import (
"context"
"fmt"
etcc "github.com/coreos/etcd/clientv3"
"strconv"
"time"
)
func WatchETCD(watcherID, keyspace string, etcd *etcc.Client, handle func(*etcc.Event)) error {
var lastRev int64
checkpt := watcherID + "lastRev"
resp, err := etcd.Get(context.Background(), checkpt)
if err != nil {
return err
}
for _, evt := range resp.Kvs {
val, err := strconv.Atoi(string(evt.Value))
if err != nil {
return fmt.Errorf("could not decode checkpoint revision: %s", string(evt.Value))
}
lastRev = int64(val) + 1
}
wchan := etcd.Watch(context.Background(), keyspace, etcc.WithPrefix(), etcc.WithRev(lastRev))
for wresp := range wchan {
for _, evt := range wresp.Events {
handle(evt)
if _, err := etcd.Put(context.Background(), checkpt, strconv.FormatInt(evt.Kv.ModRevision, 10)); err != nil {
return err
}
}
}
return nil
}
func main() {
etcd, err := etcc.New(etcc.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println(err)
return
}
defer etcd.Close()
if err := WatchETCD("", "/", etcd, func(evt *etcc.Event) { fmt.Printf("%+v\n", evt) }); err != nil {
fmt.Println(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment