Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package main
import (
"context"
"fmt"
_ "net/http/pprof"
"time"
"github.com/pingcap/log"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
func watch(ctx context.Context, cli *clientv3.Client, key string) error {
errg, ctx := errgroup.WithContext(ctx)
lease, err := cli.Grant(ctx, 5)
if err != nil {
log.Panic(err.Error())
}
_, err = cli.Put(ctx, key, "test value", clientv3.WithLease(lease.ID))
if err != nil {
log.Panic(err.Error())
}
_, err = cli.KeepAliveOnce(ctx, lease.ID)
if err != nil {
log.Panic(err.Error())
}
start := time.Now()
errg.Go(func() error {
ch := cli.Watch(ctx, key, clientv3.WithFilterPut())
for resp := range ch {
for _, event := range resp.Events {
log.Info("watch found key deleted", zap.String("key", string(event.Kv.Key)), zap.Duration("duration", time.Since(start)))
return nil
}
}
return nil
})
return errg.Wait()
}
func test(ctx context.Context) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
Context: ctx,
DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 5 * time.Second,
}),
},
})
if err != nil {
log.Panic(err.Error())
}
log.Info("start")
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < 20; i++ {
i := i
key := fmt.Sprintf("/test/lease/%d", i)
time.Sleep(time.Millisecond * 50)
errg.Go(func() error {
return watch(ctx, cli, key)
})
}
err = errg.Wait()
if err != nil {
log.Panic(err.Error())
}
}
func main() {
test(context.Background())
}
package main
import (
"context"
"fmt"
_ "net/http/pprof"
"time"
"github.com/pingcap/log"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
func watch(ctx context.Context, cli *clientv3.Client, key string) error {
errg, ctx := errgroup.WithContext(ctx)
lease, err := cli.Grant(ctx, 5)
if err != nil {
log.Panic(err.Error())
}
_, err = cli.Put(ctx, key, "test value", clientv3.WithLease(lease.ID))
if err != nil {
log.Panic(err.Error())
}
_, err = cli.KeepAliveOnce(ctx, lease.ID)
if err != nil {
log.Panic(err.Error())
}
start := time.Now()
errg.Go(func() error {
ch := cli.Watch(ctx, key, clientv3.WithFilterPut())
for resp := range ch {
for _, event := range resp.Events {
log.Info("watch found key deleted", zap.String("key", string(event.Kv.Key)), zap.Duration("duration", time.Since(start)))
return nil
}
}
return nil
})
return errg.Wait()
}
func test(ctx context.Context) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
Context: ctx,
DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 5 * time.Second,
}),
},
})
if err != nil {
log.Panic(err.Error())
}
log.Info("start")
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < 4000; i++ {
i := i
key := fmt.Sprintf("/test/lease/%d", i)
errg.Go(func() error {
return watch(ctx, cli, key)
})
}
err = errg.Wait()
if err != nil {
log.Panic(err.Error())
}
}
func main() {
test(context.Background())
}
@amyangfei

This comment has been minimized.

Copy link
Owner Author

@amyangfei amyangfei commented Jan 1, 2021

➜ go run main.go
[2021/01/01 16:23:31.480 +08:00] [INFO] [main.go:71] [start]
[2021/01/01 16:23:36.752 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/3] [duration=5.05171451s]
[2021/01/01 16:23:36.789 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/1] [duration=5.205134643s]
[2021/01/01 16:23:36.789 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/0] [duration=5.219573689s]
[2021/01/01 16:23:36.789 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/2] [duration=5.148121708s]
[2021/01/01 16:23:37.257 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/8] [duration=5.280757515s]
[2021/01/01 16:23:37.257 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/7] [duration=5.328015652s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/10] [duration=5.168516195s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/9] [duration=5.219598472s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/11] [duration=5.106616868s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/4] [duration=5.514421217s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/6] [duration=5.401137507s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/5] [duration=5.456998875s]
[2021/01/01 16:23:37.266 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/12] [duration=5.05656063s]
[2021/01/01 16:23:37.782 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/19] [duration=5.159952017s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/14] [duration=5.501389392s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/16] [duration=5.388318752s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/18] [duration=5.252221555s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/13] [duration=5.557504571s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/15] [duration=5.443666787s]
[2021/01/01 16:23:37.822 +08:00] [INFO] [main.go:40] ["watch found key deleted"] [key=/test/lease/17] [duration=5.334491651s]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment