Created
August 20, 2020 17:48
-
-
Save alexrios/8a64c958ad2dfeae65672bb2afe32ca1 to your computer and use it in GitHub Desktop.
GCP PubSub ~emulator~ client.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
"time" | |
"cloud.google.com/go/pubsub" | |
"github.com/spf13/cobra" | |
"google.golang.org/api/option" | |
) | |
func main() { | |
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8085") | |
var ( | |
times int | |
projectName string | |
topic string | |
message string | |
subscription string | |
attributes map[string]string | |
) | |
// SEND MESSAGE | |
var cmdSendMessage = &cobra.Command{ | |
Use: "send", | |
Short: "send anything to the topic", | |
Long: `send message to a PubSub topic.`, | |
Args: cobra.NoArgs, | |
Run: func(cmd *cobra.Command, args []string) { | |
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) | |
client, err := newClient(ctx, projectName) | |
if err != nil { | |
log.Fatal("ERROR:", err) | |
} | |
t := Topic(ctx, client, topic) | |
for i := 0; i < times; i++ { | |
t.Publish(ctx, &pubsub.Message{ | |
Data: []byte(message), | |
Attributes: attributes, | |
}).Get(ctx) | |
} | |
log.Println("mesage", message, "with attributes", attributes, "pusblished on", t.String()) | |
}, | |
} | |
cmdSendMessage.Flags().StringVarP(&message, "data", "d", "", "data to send to the topic") | |
cmdSendMessage.Flags().StringToStringVarP(&attributes, "attributes", "a", map[string]string{}, "message attributes") | |
cmdSendMessage.Flags().IntVarP(×, "times", "n", 1, "repeat the message N times") | |
cmdSendMessage.MarkFlagRequired("data") | |
cmdSendMessage.MarkFlagRequired("attributes") | |
// RECEIVE NEXT MESSAGE | |
var cmdReceiveNextMessage = &cobra.Command{ | |
Use: "receive-next", | |
Short: "receive next message from sub", | |
Long: `receive next message from subscription`, | |
Args: cobra.NoArgs, | |
Run: func(cmd *cobra.Command, args []string) { | |
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) | |
client, err := newClient(ctx, projectName) | |
if err != nil { | |
log.Fatal(err) | |
} | |
t := Topic(ctx, client, topic) | |
sub := Subscription(ctx, client, t) | |
log.Println("waiting for new messages from ", t.String(), " with sub:", sub.String()) | |
cancellableCtx, cancelFunc := context.WithCancel(ctx) | |
sub.Receive(cancellableCtx, func(ctx context.Context, msg *pubsub.Message) { | |
defer cancelFunc() | |
defer msg.Ack() | |
printMessage(msg) | |
}) | |
}, | |
} | |
cmdReceiveNextMessage.Flags().StringVarP(&subscription, "subscription", "s", "test-sub", "sub to receive the message") | |
var cmdReceiveMessage = &cobra.Command{ | |
Use: "receive", | |
Short: "receive messages from sub", | |
Long: `receive messages from subscription`, | |
Args: cobra.NoArgs, | |
Run: func(cmd *cobra.Command, args []string) { | |
ctx := context.Background() | |
client, err := newClient(ctx, projectName) | |
if err != nil { | |
log.Fatal(err) | |
} | |
t := Topic(ctx, client, topic) | |
sub := Subscription(ctx, client, t) | |
log.Println("waiting for new messages from ", t.String(), " with sub:", sub.String()) | |
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { | |
defer msg.Ack() | |
printMessage(msg) | |
}) | |
}, | |
} | |
cmdReceiveMessage.Flags().StringVarP(&subscription, "subscription", "s", "test-sub", "sub to receive the message") | |
//ALL COMMANDS | |
var rootCmd = &cobra.Command{Use: "app"} | |
rootCmd.PersistentFlags().StringVarP(&projectName, "project", "p", "test", "project to receive the message") | |
rootCmd.PersistentFlags().StringVarP(&topic, "topic", "t", "test-topic", "topic to send and receive the message") | |
rootCmd.AddCommand(cmdSendMessage) | |
rootCmd.AddCommand(cmdReceiveNextMessage) | |
rootCmd.AddCommand(cmdReceiveMessage) | |
rootCmd.Execute() | |
} | |
func Subscription(ctx context.Context, client *pubsub.Client, t *pubsub.Topic) *pubsub.Subscription { | |
subID := fmt.Sprintf("%s-sub", t.ID()) | |
sub := client.Subscription(subID) | |
exists, err := sub.Exists(ctx) | |
if err != nil { | |
log.Fatal("ERROR:", err) | |
} | |
if !exists { | |
sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{Topic: t}) | |
if err != nil { | |
log.Fatal("ERROR:", err) | |
} | |
} | |
return sub | |
} | |
func Topic(ctx context.Context, client *pubsub.Client, topic string) *pubsub.Topic { | |
t := client.Topic(topic) | |
exists, err := t.Exists(ctx) | |
if err != nil { | |
log.Fatal("ERROR:", err) | |
} | |
if !exists { | |
t, err := client.CreateTopic(ctx, t.ID()) | |
if err != nil { | |
log.Fatal(err) | |
} | |
log.Println("Creating topic", t.String()) | |
} | |
return t | |
} | |
func newClient(ctx context.Context, project string) (*pubsub.Client, error) { | |
options := []option.ClientOption{option.WithoutAuthentication()} | |
client, err := pubsub.NewClient(ctx, project, options...) | |
if err != nil { | |
return nil, err | |
} | |
return client, nil | |
} | |
func printMessage(msg *pubsub.Message) { | |
var m = struct{ | |
Data string | |
Attributes map[string]string | |
}{ | |
Data : string(msg.Data), | |
Attributes: msg.Attributes, | |
} | |
jsonBytes, err := json.Marshal(m) | |
if err != nil { | |
log.Fatal(err) | |
} | |
log.Println(string(jsonBytes)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment