Skip to content

Instantly share code, notes, and snippets.

@craftslab
Last active August 8, 2021 15:37
Show Gist options
  • Save craftslab/72c311816446cc14e03d7312b86a2ec4 to your computer and use it in GitHub Desktop.
Save craftslab/72c311816446cc14e03d7312b86a2ec4 to your computer and use it in GitHub Desktop.
etcd client

Requirement

  • etcd >= 3.5.0

Run

# Start a local etcd server
etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379

# Write to etcd
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 put "/path/to/key" "val"

# Read from etcd
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 get "/" --keys-only=true --prefix
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 get "/path/to/" --prefix
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 get "/path/to/key"

# Delete from etcd
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 del "/path/to/key"

# Watch on etcd
ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 watch "/path/to" --prefix

Reference

module github.com/craftslab/etcdclient
go 1.16
require (
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
google.golang.org/grpc v1.39.1
gopkg.in/yaml.v2 v2.4.0 // indirect
)
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"log"
"os"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/grpclog"
)
const (
key = "/path/to/key"
val = "val"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("failed to new client: %v", err)
}
defer c.Close()
kv := clientv3.NewKV(c)
go func() {
for {
_, err = kv.Put(context.TODO(), key, val)
if rsp, err := kv.Get(context.TODO(), key); err == nil && len(rsp.Kvs) != 0 {
log.Printf("get kv: %s %s", rsp.Kvs[0].Key, rsp.Kvs[0].Value)
}
_, err = kv.Delete(context.TODO(), key)
time.Sleep(1 * time.Second)
}
}()
watcher := clientv3.NewWatcher(c)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})
rspChan := watcher.Watch(ctx, key)
for item := range rspChan {
for _, event := range item.Events {
switch event.Type {
case mvccpb.PUT:
log.Printf("watch kv: put %s %s", event.Kv.Key, event.Kv.Value)
case mvccpb.DELETE:
log.Printf("watch kv: delete %s", event.Kv.Key)
}
}
}
log.Println("client exiting")
}
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"log"
"os"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
const (
key = "/path/to/key"
val = "val"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
tlsInfo := transport.TLSInfo{
CertFile: "",
KeyFile: "",
TrustedCAFile: "",
}
_, err := tlsInfo.ClientConfig()
if err != nil {
log.Fatalf("failed to config: %v", err)
}
client, err := clientv3.New(clientv3.Config{
Context: context.Background(),
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 3 * time.Second,
DialKeepAliveTime: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: nil,
Username: "",
Password: "",
})
if err != nil {
log.Fatalf("failed to new: %v", err)
}
defer func() {
_ = client.Close()
}()
leaser := clientv3.NewLease(client)
defer func() {
_ = leaser.Close()
}()
ttlDuration := 30 * time.Second
leaseID, err := leaser.Grant(context.Background(), int64(ttlDuration.Seconds()))
if err != nil {
log.Fatalf("failed to grant: %v", err)
}
if _, err = leaser.KeepAlive(context.Background(), leaseID.ID); err != nil {
log.Fatalf("failed to alive: %v", err)
}
kv := clientv3.NewKV(client)
go func() {
for {
_, err = kv.Put(context.Background(), key, val, clientv3.WithLease(leaseID.ID))
if rsp, err := kv.Get(context.Background(), key); err == nil && len(rsp.Kvs) != 0 {
log.Printf("get kv: %s %s", rsp.Kvs[0].Key, rsp.Kvs[0].Value)
}
_, err = kv.Delete(context.Background(), key)
time.Sleep(1 * time.Second)
}
}()
watcher := clientv3.NewWatcher(client)
defer func() {
_ = watcher.Close()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})
rspChan := watcher.Watch(ctx, key)
for item := range rspChan {
for _, event := range item.Events {
switch event.Type {
case mvccpb.PUT:
log.Printf("watch kv: put %s %s", event.Kv.Key, event.Kv.Value)
case mvccpb.DELETE:
log.Printf("watch kv: delete %s", event.Kv.Key)
}
}
}
log.Println("client exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment