Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save billhathaway/bf2e3b3bc6d7203d59831bf5c3e0c725 to your computer and use it in GitHub Desktop.
Save billhathaway/bf2e3b3bc6d7203d59831bf5c3e0c725 to your computer and use it in GitHub Desktop.
pubsub utility routines
package psutil
import (
"fmt"
"log"
"strings"
"cloud.google.com/go/pubsub"
netcontext "golang.org/x/net/context"
)
const alreadyExists = "AlreadyExists"
// createSubscriptions creates a subscription for each topic named topic + suffix
func createSubscriptions(client *pubsub.Client, topics []string, suffix string) error {
ctx := netcontext.Background()
for _, topic := range topics {
subName := topic + suffix
sub := client.Subscription(subName)
ok, err := sub.Exists(ctx)
if err != nil {
return err
}
if ok {
continue
}
cfg, err := sub.Config(ctx)
if err != nil {
return err
}
_, err = client.CreateSubscription(ctx, subName, cfg)
if err != nil {
return err
}
}
return nil
}
// createTopics will create all the topics, errors due to existing are ignored
func createTopics(client *pubsub.Client, topics []string) error {
ctx := netcontext.Background()
for _, topic := range topics {
_, err := client.CreateTopic(ctx, topic)
if err != nil {
if !strings.Contains(err.Error(), alreadyExists) {
return err
}
}
}
return nil
}
// subListener receives messages and passes them into the channel, spawning a goroutine
func subListener(ctx netcontext.Context, client *pubsub.Client, id string) (chan *pubsub.Message, error) {
sub := client.Subscription(id)
ok, err := sub.Exists(ctx)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("subscription %s does not exist", id)
}
c := make(chan *pubsub.Message)
go func() {
err := sub.Receive(ctx, func(ctx netcontext.Context, m *pubsub.Message) {
c <- m
})
close(c)
log.Printf("op=subListener sub=%s error=%s\n", id, err)
return
}()
return c, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment