Created
June 10, 2017 17:17
-
-
Save billhathaway/bf2e3b3bc6d7203d59831bf5c3e0c725 to your computer and use it in GitHub Desktop.
pubsub utility routines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package psutil | |
import ( | |
"fmt" | |
"log" | |
"strings" | |
"cloud.google.com/go/pubsub" | |
netcontext "golang.org/x/net/context" | |
) | |
const alreadyExists = "AlreadyExists" | |
// createSubscriptions creates a subscription for each topic named topic + suffix | |
func createSubscriptions(client *pubsub.Client, topics []string, suffix string) error { | |
ctx := netcontext.Background() | |
for _, topic := range topics { | |
subName := topic + suffix | |
sub := client.Subscription(subName) | |
ok, err := sub.Exists(ctx) | |
if err != nil { | |
return err | |
} | |
if ok { | |
continue | |
} | |
cfg, err := sub.Config(ctx) | |
if err != nil { | |
return err | |
} | |
_, err = client.CreateSubscription(ctx, subName, cfg) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
// createTopics will create all the topics, errors due to existing are ignored | |
func createTopics(client *pubsub.Client, topics []string) error { | |
ctx := netcontext.Background() | |
for _, topic := range topics { | |
_, err := client.CreateTopic(ctx, topic) | |
if err != nil { | |
if !strings.Contains(err.Error(), alreadyExists) { | |
return err | |
} | |
} | |
} | |
return nil | |
} | |
// subListener receives messages and passes them into the channel, spawning a goroutine | |
func subListener(ctx netcontext.Context, client *pubsub.Client, id string) (chan *pubsub.Message, error) { | |
sub := client.Subscription(id) | |
ok, err := sub.Exists(ctx) | |
if err != nil { | |
return nil, err | |
} | |
if !ok { | |
return nil, fmt.Errorf("subscription %s does not exist", id) | |
} | |
c := make(chan *pubsub.Message) | |
go func() { | |
err := sub.Receive(ctx, func(ctx netcontext.Context, m *pubsub.Message) { | |
c <- m | |
}) | |
close(c) | |
log.Printf("op=subListener sub=%s error=%s\n", id, err) | |
return | |
}() | |
return c, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment