Skip to content

Instantly share code, notes, and snippets.

@jameshartig
Last active July 10, 2019 01:26
Show Gist options
  • Save jameshartig/95c1148db456190fc61034accf3e85d1 to your computer and use it in GitHub Desktop.
Save jameshartig/95c1148db456190fc61034accf3e85d1 to your computer and use it in GitHub Desktop.
Reproduction for #1485
package main
import (
"context"
"flag"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
"go.opencensus.io/stats/view"
"cloud.google.com/go/pubsub"
"github.com/levenlabs/go-llog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func isGoogleCloudErrAlreadyExists(err error) bool {
if err == nil {
return false
}
s, ok := status.FromError(err)
return ok && s.Code() == codes.AlreadyExists
}
type ocstat int64
// ExportView implements the view.Exporter interface
func (s *ocstat) ExportView(d *view.Data) {
if len(d.Rows) < 1 {
return
}
atomic.SwapInt64((*int64)(s), int64(d.Rows[0].Data.(*view.SumData).Value))
}
func main() {
proj := flag.String("project", "admiral-1007", "project name")
topic := flag.String("topic", "pubsub_test", "name of the topic to use")
outstanding := flag.Int("num-outstanding", 1000, "number of oustanding messages")
numPublish := flag.Int("num-publish", 1000000, "number of jobs to publish")
flag.Parse()
go func() {
// useful for pprof
http.ListenAndServe("127.0.0.1:9999", nil)
}()
// the client, topic and subscription
psc, err := pubsub.NewClient(context.Background(), *proj)
if err != nil {
panic(err)
}
t, err := psc.CreateTopic(context.Background(), *topic)
if err != nil && !isGoogleCloudErrAlreadyExists(err) {
panic(err)
} else if t == nil {
t = psc.Topic(*topic)
}
sub, err := psc.CreateSubscription(context.Background(), *topic+"_test", pubsub.SubscriptionConfig{
Topic: t,
AckDeadline: 15 * time.Second,
})
if err != nil && !isGoogleCloudErrAlreadyExists(err) {
panic(err)
} else if sub == nil {
sub = psc.Subscription(*topic + "_test")
}
ctx, cancel := context.WithCancel(context.Background())
var acked int64
var published int64
// setup goroutine to print out stats once a minute
{
var pullCount ocstat
view.SetReportingPeriod(time.Second) // try to make sure we get an export close so it matches up
view.RegisterExporter(&pullCount)
view.Register(pubsub.PullCountView)
var lastPullCount int64
go func() {
tick := time.NewTicker(time.Minute)
for {
select {
case <-tick.C:
a := atomic.SwapInt64(&acked, 0)
p := atomic.SwapInt64(&published, 0)
pc := atomic.LoadInt64((*int64)(&pullCount))
lpc := atomic.SwapInt64(&lastPullCount, pc)
llog.Info("last minute", llog.KV{"acked": a, "pullCount": pc - lpc, "published": p})
case <-ctx.Done():
return
}
}
}()
}
// make a channel so we can start receiving after publishes finish
donePublishing := make(chan bool)
wg := new(sync.WaitGroup)
// publish all of the messages to pubsub
if *numPublish > 0 {
llog.Info("starting publishing")
innerWG := new(sync.WaitGroup)
publishers := 100
per := *numPublish / publishers
for i := 0; i < 100; i++ {
innerWG.Add(1)
wg.Add(1)
go func() {
defer innerWG.Done()
defer wg.Done()
for i := 0; i < per; i++ {
_, err := t.Publish(ctx, &pubsub.Message{
// just something from json-generator.com
Data: []byte(`{"_id":"5bc522c300adfb89aa57aa46","index":0,"guid":"7068a7a9-9fb1-4fc6-867a-60144f0a4312","greeting":"Hello, undefined! You have 5 unread messages.","favoriteFruit":"banana"}`),
}).Get(ctx)
if ctx.Err() == context.Canceled {
return
}
if err != nil {
llog.Warn("error publishing", llog.ErrKV(err))
time.Sleep(time.Second)
} else {
atomic.AddInt64(&published, 1)
}
}
}()
}
// once all of our publishers are done then close the done ch
go func() {
innerWG.Wait()
close(donePublishing)
}()
} else {
close(donePublishing)
}
sub.ReceiveSettings.MaxOutstandingMessages = *outstanding
sub.ReceiveSettings.NumGoroutines = 10
sub.ReceiveSettings.MaxExtension = time.Hour
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-donePublishing:
llog.Info("starting receive")
case <-ctx.Done():
return
}
for {
err := sub.Receive(ctx, func(mctx context.Context, msg *pubsub.Message) {
select {
// 100ms -> ~3 seconds
case <-time.After(time.Duration(100+rand.Int63n(3000)) * time.Millisecond):
msg.Ack()
atomic.AddInt64(&acked, 1)
case <-mctx.Done():
msg.Nack()
}
})
if ctx.Err() == context.Canceled {
return
}
if err != nil {
llog.Warn("error receiving", llog.ErrKV(err))
time.Sleep(time.Second)
}
}
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
cancel()
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment