Skip to content

Instantly share code, notes, and snippets.

@Shazambom
Last active July 31, 2023 00:03
Show Gist options
  • Save Shazambom/66111731ef50df83e7126e6c28aa7c04 to your computer and use it in GitHub Desktop.
Save Shazambom/66111731ef50df83e7126e6c28aa7c04 to your computer and use it in GitHub Desktop.
An Example of how to use RabbitMQ's Direct Reply-To feature in Golang using github.com/streadway/amqp
package main
import (
"errors"
"fmt"
"github.com/streadway/amqp"
"log"
"math/rand"
"strconv"
)
func main() {
result := make(chan string)
ensure := make(chan string)
go client(result)
go server(ensure)
r := <-ensure
fmt.Println(r)
r = <-result
fmt.Println(r)
}
func declareQueue(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (*amqp.Channel, *amqp.Connection, error) {
conn, err := amqp.Dial( "amqp://guest:guest@localhost:5672/")
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil || ch == nil {
return nil, nil, errors.New("Error getting channel")
}
_, err = ch.QueueDeclare(
name, // name
durable, // durable
autoDelete, // delete when unused
exclusive, // exclusive
noWait, // noWait
args, // arguments
)
return ch, conn, err
}
func client(result chan string) {
ch, conn, err := declareQueue("foo", false, false, false, false, nil)
if err != nil {
log.Println(err.Error())
result <- err.Error()
return
}
defer ch.Close()
defer conn.Close()
// If we can consume the message, then we know it published successfully.
msgs, err := ch.Consume(
"amq.rabbitmq.reply-to", // queue
"ReplyToConsumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("consume fail")
}
err = ch.Publish(
"", // exchange
"foo", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
CorrelationId: getCorrelationId(),
Body: []byte("FOO"),
ReplyTo: "amq.rabbitmq.reply-to",
})
if err != nil {
log.Printf("publish fail")
}
for m := range msgs {
result <- string(m.Body)
}
}
func server(result chan string) {
ch, conn, err := declareQueue("foo", false, false, false, false, nil)
if err != nil {
log.Println(err.Error())
result <- err.Error()
return
}
defer ch.Close()
defer conn.Close()
// If we can consume the message, then we know it published successfully.
msgs, err := ch.Consume(
"foo", // queue
"ReplyToConsumer", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("consume fail")
}
for m := range msgs {
err = ch.Publish(
"", // exchange
m.ReplyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
CorrelationId: getCorrelationId(),
Body: []byte("BAR"),
})
if err != nil {
log.Printf("publish fail")
}
err = ch.Ack(m.DeliveryTag, false)
if err != nil {
log.Printf("ack fail")
}
result <- string(m.Body)
}
}
func getCorrelationId() string {
return strconv.Itoa(rand.Intn(9999999999))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment