Skip to content

Instantly share code, notes, and snippets.

@alexrios
Created August 20, 2020 17:48
Show Gist options
  • Save alexrios/8a64c958ad2dfeae65672bb2afe32ca1 to your computer and use it in GitHub Desktop.
Save alexrios/8a64c958ad2dfeae65672bb2afe32ca1 to your computer and use it in GitHub Desktop.
GCP PubSub ~emulator~ client.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"github.com/spf13/cobra"
"google.golang.org/api/option"
)
func main() {
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8085")
var (
times int
projectName string
topic string
message string
subscription string
attributes map[string]string
)
// SEND MESSAGE
var cmdSendMessage = &cobra.Command{
Use: "send",
Short: "send anything to the topic",
Long: `send message to a PubSub topic.`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := newClient(ctx, projectName)
if err != nil {
log.Fatal("ERROR:", err)
}
t := Topic(ctx, client, topic)
for i := 0; i < times; i++ {
t.Publish(ctx, &pubsub.Message{
Data: []byte(message),
Attributes: attributes,
}).Get(ctx)
}
log.Println("mesage", message, "with attributes", attributes, "pusblished on", t.String())
},
}
cmdSendMessage.Flags().StringVarP(&message, "data", "d", "", "data to send to the topic")
cmdSendMessage.Flags().StringToStringVarP(&attributes, "attributes", "a", map[string]string{}, "message attributes")
cmdSendMessage.Flags().IntVarP(&times, "times", "n", 1, "repeat the message N times")
cmdSendMessage.MarkFlagRequired("data")
cmdSendMessage.MarkFlagRequired("attributes")
// RECEIVE NEXT MESSAGE
var cmdReceiveNextMessage = &cobra.Command{
Use: "receive-next",
Short: "receive next message from sub",
Long: `receive next message from subscription`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := newClient(ctx, projectName)
if err != nil {
log.Fatal(err)
}
t := Topic(ctx, client, topic)
sub := Subscription(ctx, client, t)
log.Println("waiting for new messages from ", t.String(), " with sub:", sub.String())
cancellableCtx, cancelFunc := context.WithCancel(ctx)
sub.Receive(cancellableCtx, func(ctx context.Context, msg *pubsub.Message) {
defer cancelFunc()
defer msg.Ack()
printMessage(msg)
})
},
}
cmdReceiveNextMessage.Flags().StringVarP(&subscription, "subscription", "s", "test-sub", "sub to receive the message")
var cmdReceiveMessage = &cobra.Command{
Use: "receive",
Short: "receive messages from sub",
Long: `receive messages from subscription`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()
client, err := newClient(ctx, projectName)
if err != nil {
log.Fatal(err)
}
t := Topic(ctx, client, topic)
sub := Subscription(ctx, client, t)
log.Println("waiting for new messages from ", t.String(), " with sub:", sub.String())
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
defer msg.Ack()
printMessage(msg)
})
},
}
cmdReceiveMessage.Flags().StringVarP(&subscription, "subscription", "s", "test-sub", "sub to receive the message")
//ALL COMMANDS
var rootCmd = &cobra.Command{Use: "app"}
rootCmd.PersistentFlags().StringVarP(&projectName, "project", "p", "test", "project to receive the message")
rootCmd.PersistentFlags().StringVarP(&topic, "topic", "t", "test-topic", "topic to send and receive the message")
rootCmd.AddCommand(cmdSendMessage)
rootCmd.AddCommand(cmdReceiveNextMessage)
rootCmd.AddCommand(cmdReceiveMessage)
rootCmd.Execute()
}
func Subscription(ctx context.Context, client *pubsub.Client, t *pubsub.Topic) *pubsub.Subscription {
subID := fmt.Sprintf("%s-sub", t.ID())
sub := client.Subscription(subID)
exists, err := sub.Exists(ctx)
if err != nil {
log.Fatal("ERROR:", err)
}
if !exists {
sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{Topic: t})
if err != nil {
log.Fatal("ERROR:", err)
}
}
return sub
}
func Topic(ctx context.Context, client *pubsub.Client, topic string) *pubsub.Topic {
t := client.Topic(topic)
exists, err := t.Exists(ctx)
if err != nil {
log.Fatal("ERROR:", err)
}
if !exists {
t, err := client.CreateTopic(ctx, t.ID())
if err != nil {
log.Fatal(err)
}
log.Println("Creating topic", t.String())
}
return t
}
func newClient(ctx context.Context, project string) (*pubsub.Client, error) {
options := []option.ClientOption{option.WithoutAuthentication()}
client, err := pubsub.NewClient(ctx, project, options...)
if err != nil {
return nil, err
}
return client, nil
}
func printMessage(msg *pubsub.Message) {
var m = struct{
Data string
Attributes map[string]string
}{
Data : string(msg.Data),
Attributes: msg.Attributes,
}
jsonBytes, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
log.Println(string(jsonBytes))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment