Skip to content

Instantly share code, notes, and snippets.

@jba
Created August 28, 2017 13:06
Show Gist options
  • Save jba/99879125dfe20bc62a1d82b47c89b730 to your computer and use it in GitHub Desktop.
Save jba/99879125dfe20bc62a1d82b47c89b730 to your computer and use it in GitHub Desktop.
// Tool to publish messages at intervals, or receive messages.
package main
import (
"flag"
"fmt"
"log"
"os"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
)
var (
projectID = flag.String("project", "", "project ID")
topic = flag.String("topic", "", "topic name")
// for publish
count = flag.Int("count", 1, "number to publish in one burst")
interval = flag.Duration("interval", 1*time.Second, "time between bursts")
// for receive
subscription = flag.String("sub", "", "subscription")
each = flag.Bool("each", false, "print each message")
)
func main() {
cmd := os.Args[1]
copy(os.Args[1:], os.Args[2:])
flag.Parse()
if *projectID == "" {
log.Fatal("need -project")
}
if *topic == "" {
log.Fatal("need -topic")
}
ctx := context.Background()
c, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
log.Fatal(err)
}
defer c.Close()
t := c.Topic(*topic)
switch cmd {
case "publish":
doPublish(ctx, c, t)
case "receive":
doReceive(ctx, c, t)
default:
log.Fatalf("unknown action %q", cmd)
}
}
func doPublish(ctx context.Context, c *pubsub.Client, t *pubsub.Topic) {
s := 0
for {
log.Printf("publishing %d messages", *count)
if err := publishN(ctx, t, *count, s); err != nil {
log.Fatal(err)
}
s += *count
log.Printf("sleeping %s", *interval)
time.Sleep(*interval)
}
}
// Publish N messages and wait for them to be sent.
// Message data is integers beginning from start.
func publishN(ctx context.Context, t *pubsub.Topic, n, start int) error {
var rs []*pubsub.PublishResult
for i := 0; i < n; i++ {
r := t.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("%d", start+i)),
})
rs = append(rs, r)
}
for _, r := range rs {
_, err := r.Get(ctx)
if err != nil {
return err
}
}
return nil
}
func doReceive(ctx context.Context, c *pubsub.Client, t *pubsub.Topic) {
if *subscription == "" {
log.Fatal("need -sub")
}
sub := c.Subscription(*subscription)
var nmsgs int64 // atomic
go func() {
time.Sleep(10 * time.Second)
for {
log.Printf("received %d messages", atomic.LoadInt64(&nmsgs))
time.Sleep(*interval)
}
}()
log.Fatal(sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
if *each {
log.Printf("received %s\n", string(m.Data))
}
atomic.AddInt64(&nmsgs, 1)
m.Ack()
}))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment