Skip to content

Instantly share code, notes, and snippets.

@ljtill
Last active September 10, 2021 12:42
Show Gist options
  • Save ljtill/caad839fc5383a24323af78f8cc6a087 to your computer and use it in GitHub Desktop.
Save ljtill/caad839fc5383a24323af78f8cc6a087 to your computer and use it in GitHub Desktop.
Provides the ability to interact with Service Bus
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
servicebus "github.com/Azure/azure-service-bus-go"
"github.com/google/uuid"
)
type Payload struct {
TenantId string `json:"tenantId"`
SubscriptionId string `json:"subscriptionId"`
}
func GetConnectionString() string {
connStr := os.Getenv("CONN_STR")
if connStr == "" {
log.Fatal("Undefined environment variable CONN_STR")
}
return connStr
}
func GetQueue() *servicebus.Queue {
connStr := GetConnectionString()
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
log.Fatal(err)
}
q, err := ns.NewQueue("default")
if err != nil {
log.Fatal(err)
}
return q
}
func SendMessage() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
q := GetQueue()
for i := 0; i < 5; i++ {
payload := &Payload{
TenantId: uuid.New().String(),
SubscriptionId: uuid.New().String(),
}
data, err := json.Marshal(payload)
if err != nil {
log.Fatal(err)
}
err = q.Send(ctx, servicebus.NewMessage(data))
if err != nil {
log.Fatal(err)
}
fmt.Println(" ")
fmt.Println("Message Sent")
fmt.Println("Tenant Id -", payload.TenantId)
fmt.Println("Subscription Id -", payload.SubscriptionId)
}
}
func ReceiveMessages() {
rootCtx := context.Background()
ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
defer cancel()
q := GetQueue()
var payloads []Payload
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
var payload Payload
err := json.Unmarshal(msg.Data, &payload)
if err != nil {
log.Fatal(err)
}
payloads = append(payloads, Payload{
TenantId: payload.TenantId,
SubscriptionId: payload.SubscriptionId,
})
return msg.Complete(ctx)
}
err := q.Receive(ctx, printMessage)
if err != nil {
if err.Error() == "context timeout exceeded" {
fmt.Printf("\nInfo: %v\n", err)
} else {
fmt.Printf("Error: %v\n", err)
os.Exit(1)
}
}
for i := 0; i < len(payloads); i++ {
fmt.Println(" ")
fmt.Println("Message Receive")
fmt.Println("Tenant Id -", payloads[i].TenantId)
fmt.Println("Subscription Id -", payloads[i].SubscriptionId)
}
}
func main() {
SendMessage()
ReceiveMessages()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment