Skip to content

Instantly share code, notes, and snippets.

@jba
Last active March 28, 2018 23:04
Show Gist options
  • Save jba/08cff600886fc42f66af41471f1c2496 to your computer and use it in GitHub Desktop.
Save jba/08cff600886fc42f66af41471f1c2496 to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"log"
"regexp"
"strings"
"time"
"cloud.google.com/go/pubsub"
"go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats"
"golang.org/x/net/context"
)
var (
projectID = flag.String("project", "", "project ID")
subscription = flag.String("sub", "", "subscription")
ackDelay = flag.Duration("ack", 0, "delay to ack each message")
)
func main() {
flag.Parse()
if *projectID == "" {
log.Fatal("need -project")
}
if *subscription == "" {
log.Fatal("need -sub")
}
stats.RegisterExporter(newExporter())
exporter, err := stackdriver.NewExporter(stackdriver.Options{ProjectID: *projectID})
if err != nil {
log.Fatal(err)
}
stats.RegisterExporter(exporter)
for _, v := range []*stats.View{
ocgrpc.RPCClientRequestCountView,
ocgrpc.RPCClientErrorCountView,
pubsub.PullCountView,
pubsub.AckCountView,
pubsub.NackCountView,
pubsub.ModAckCountView,
pubsub.StreamOpenCountView,
pubsub.StreamRetryCountView,
pubsub.StreamRequestCountView,
pubsub.StreamResponseCountView,
} {
if err := v.Subscribe(); err != nil {
log.Fatal(err)
}
log.Printf("subscribed to %s", v.Name())
}
ctx := context.Background()
client, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
log.Fatal(err)
}
s := client.Subscription(*subscription)
log.Printf("running with project %s, subscription %s, ack delay %s", *projectID, *subscription, *ackDelay)
err = s.Receive(ctx, func(_ context.Context, m *pubsub.Message) {
time.Sleep(*ackDelay)
m.Ack()
})
log.Printf("Receive returned: %v", err)
}
type exporter struct {
prevEnd time.Time
data map[string]int
}
func newExporter() *exporter {
return &exporter{
prevEnd: time.Now(),
data: map[string]int{},
}
}
var re = regexp.MustCompile(`cloud.google.com/go/pubsub/(.*)_count`)
func (e *exporter) ExportView(vd *stats.ViewData) {
matches := re.FindStringSubmatch(vd.View.Measure().Name())
if len(matches) > 0 && len(vd.Rows) > 0 {
if vd.End.Unix() != e.prevEnd.Unix() {
var s []string
for _, key := range []string{"pull", "ack", "nack", "mod_ack", "stream_open", "stream_retry",
"stream_request", "stream_response"} {
name := key
if strings.HasPrefix(name, "stream_") {
name = name[7:]
}
s = append(s, fmt.Sprintf("%s:%d", name, e.data[key]))
}
log.Printf("%s\n", strings.Join(s, " "))
e.prevEnd = vd.End
e.data = map[string]int{}
}
e.data[matches[1]] = int(*vd.Rows[0].Data.(*stats.SumData))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment