Skip to content

Instantly share code, notes, and snippets.

@Tinitto
Created May 11, 2021 17:25
Show Gist options
  • Save Tinitto/d2c24b14486618398c9b4d4dbccb4a86 to your computer and use it in GitHub Desktop.
Save Tinitto/d2c24b14486618398c9b4d4dbccb4a86 to your computer and use it in GitHub Desktop.
How to Connect to an AMQP 1.0 Topic not Queue in Golang
/*
* In an attempt to use the https://github.com/Azure/go-amqp/ to connect to an AMQP topic, I was using the sample code
* on the README but kept getting an error like:
* "Failed creating receiver link *Error{Condition: amqp:unauthorized-access, Description: User qbtzf4r2sqkimdl is not authorized to read from: queue://topic..."
* To connect to an AMQP 1.0 topic, the LinkSourceAddress or the LinkTargetAddres has to be of the form "topic://..."
*/
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
// Create client
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Send a message
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("topic://topic-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("topic://topic-name"),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment