Skip to content

Instantly share code, notes, and snippets.

@jameshartig
Last active October 16, 2018 15:26
Show Gist options
  • Save jameshartig/e9ff2774926f864d06e3be1f9edd4000 to your computer and use it in GitHub Desktop.
Save jameshartig/e9ff2774926f864d06e3be1f9edd4000 to your computer and use it in GitHub Desktop.
Reproduction for #1177
package main
import (
"context"
"flag"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/levenlabs/go-llog"
"cloud.google.com/go/pubsub"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func isGoogleCloudErrAlreadyExists(err error) bool {
if err == nil {
return false
}
s, ok := status.FromError(err)
return ok && s.Code() == codes.AlreadyExists
}
func main() {
proj := flag.String("project", "admiral-1007", "project name")
topic := flag.String("topic", "pubsub_test", "name of the topic to use")
consumers := flag.Int("num-consumers", 1000, "number of consumers")
flag.Parse()
go func() {
http.ListenAndServe("127.0.0.1:9999", nil)
}()
psc, err := pubsub.NewClient(context.Background(), *proj)
if err != nil {
panic(err)
}
t, err := psc.CreateTopic(context.Background(), *topic)
if err != nil && !isGoogleCloudErrAlreadyExists(err) {
panic(err)
} else if t == nil {
t = psc.Topic(*topic)
}
sub, err := psc.CreateSubscription(context.Background(), *topic+"_test", pubsub.SubscriptionConfig{
Topic: t,
AckDeadline: 15 * time.Second,
})
if err != nil && !isGoogleCloudErrAlreadyExists(err) {
panic(err)
} else if sub == nil {
sub = psc.Subscription(*topic + "_test")
}
ctx, cancel := context.WithCancel(context.Background())
wg := new(sync.WaitGroup)
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
_, err := t.Publish(ctx, &pubsub.Message{
// just something from json-generator.com
Data: []byte(`{"_id":"5bc522c300adfb89aa57aa46","index":0,"guid":"7068a7a9-9fb1-4fc6-867a-60144f0a4312","greeting":"Hello, undefined! You have 5 unread messages.","favoriteFruit":"banana"}`),
}).Get(ctx)
if ctx.Err() == context.Canceled {
return
}
if err != nil {
llog.Warn("error publishing", llog.ErrKV(err))
time.Sleep(time.Second)
}
}
}()
}
sub.ReceiveSettings.MaxOutstandingMessages = *consumers
sub.ReceiveSettings.NumGoroutines = runtime.NumCPU() * 10
sub.ReceiveSettings.MaxExtension = 15 * time.Second
var count int64
go func() {
tick := time.NewTicker(time.Minute)
for {
select {
case <-tick.C:
v := atomic.SwapInt64(&count, 0)
llog.Info("count received last minute", llog.KV{"count": v})
case <-ctx.Done():
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
time.Sleep(time.Duration(300+rand.Int63n(200)) * time.Millisecond)
msg.Ack()
atomic.AddInt64(&count, 1)
})
if ctx.Err() == context.Canceled {
return
}
if err != nil {
llog.Warn("error receiving", llog.ErrKV(err))
time.Sleep(time.Second)
}
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
cancel()
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment