Skip to content

Instantly share code, notes, and snippets.

@pfortin-urbn
Last active May 18, 2018 13:04
Show Gist options
  • Save pfortin-urbn/a78bff67cdb32343ba4efeb52dff95ec to your computer and use it in GitHub Desktop.
Save pfortin-urbn/a78bff67cdb32343ba4efeb52dff95ec to your computer and use it in GitHub Desktop.
PubSub Example
// Google Pub/Sub Info
PubSubProjectID string `split_words:"true" default:"ecomm-dev-poc"`
PubSubTopicName string `split_words:"true" default:"orderfulfillment-DEV"`
PubSubSubscriptionName string `split_words:"true" default:"tibco-subscription-DEV"`
PubSubPublishTimeout time.Duration `split_words:"true" default:"2s"`
GoogleCredentials string `split_words:"true" required:"true"`
// GOOGLE_CREDENTIALS=`cat credentials.json`
package clients
import (
"context"
"time"
"os"
"cloud.google.com/go/pubsub"
"github.com/urbn/orderfulfillment/app"
"github.com/urbn/orderfulfillment/app/logging"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
var logger = logging.Logger
type PubSubClient struct {
Client app.PubSubI
PubSubProjectID string
PubSubTopicName string
PubSubPublishTimeout time.Duration
}
var NewClientProc = NewClient
func NewClient(pubSubProjectID string, opts ...option.ClientOption) (*pubsub.Client, error) {
return pubsub.NewClient(context.Background(), pubSubProjectID, opts[0])
}
func CreatePubSubClient(configuration *app.Specification) (PubSubClient, error) {
conf, err := google.JWTConfigFromJSON([]byte(configuration.GoogleCredentials), "https://www.googleapis.com/auth/pubsub")
if err != nil {
logger.Errorf("%s: %s", "Could not create pubsub configuration, error", err.Error())
os.Exit(1)
}
ctx := context.Background()
ts := conf.TokenSource(ctx)
client, err := NewClientProc(configuration.PubSubProjectID, option.WithTokenSource(ts))
if err != nil {
logger.Errorf("%s: %s", "Could not create pubsub connection, error", err.Error())
os.Exit(1)
}
err = createTopicIfNotExists(client, configuration.PubSubTopicName)
if err != nil {
return PubSubClient{}, err
}
pubSubClient := PubSubClient{
Client: client,
PubSubProjectID: configuration.PubSubProjectID,
PubSubPublishTimeout: configuration.PubSubPublishTimeout,
PubSubTopicName: configuration.PubSubTopicName,
}
return pubSubClient, nil
}
func createTopicIfNotExists(c *pubsub.Client, topicName string) error {
ctx := context.Background()
// Create a topic to subscribe to.
t := c.Topic(topicName)
ok, _ := t.Exists(ctx)
if ok {
return nil
}
t, err := c.CreateTopic(ctx, topicName)
if err != nil {
return err
}
logger.Infof("Created topic %s", topicName)
return nil
}
func (pc PubSubClient) PublishMessage(data []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), pc.PubSubPublishTimeout)
defer cancel()
message := &pubsub.Message{
Data: data,
}
topic := pc.Topic(pc.PubSubTopicName)
result := pc.Publish(topic, ctx, message)
_, err := result.Get(ctx)
if err != nil {
logger.Printf(err.Error())
}
return err
}
func (pc PubSubClient) Topic(topicName string) *pubsub.Topic {
return pc.Client.Topic(topicName)
}
func (pc PubSubClient) Publish(topic *pubsub.Topic, ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
return topic.Publish(ctx, msg)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment