Skip to content

Instantly share code, notes, and snippets.

@dselans
Created January 20, 2020 04:26
Show Gist options
  • Save dselans/77dc432c7d57aa94bc7c0ca67070f539 to your computer and use it in GitHub Desktop.
Save dselans/77dc432c7d57aa94bc7c0ca67070f539 to your computer and use it in GitHub Desktop.
RabbitMQ header exchange publisher and consumer example in Go
// Quick example of using the headers exchange with the streadway/amqp lib.
//
// NOTE: I couldn't find any examples of this anywhere, so maybe this will help
// someone else.
//
// Run consumer in one terminal: $ go run main.go
// Run publisher in another terminal: $ go run main.go -action publisher
//
// The consumer should have received a message.
//
// To see that header routing is working as expected, specify a different header
// in the Publish call -- the message should no longer be received by the consumer.
//
package main
import (
"flag"
"fmt"
"log"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
const (
ConsumerAction = "consumer"
PublisherAction = "publisher"
)
var (
action = flag.String("action", ConsumerAction, "publisher or consumer")
rabbitURL = flag.String("url", "amqp://localhost", "rabbitmq url")
exchangeName = flag.String("headers", "headers", "name of the exchange")
queueName = flag.String("queue", "header-queue", "name of the queue (only needed for consumer)")
)
type Options struct {
URL string
ExchangeName string
ExchangeType string
QueueName string
RoutingKey string
}
func main() {
flag.Parse()
ch, err := connect(*action, &Options{
*rabbitURL,
*exchangeName,
"headers",
*queueName,
"",
})
if err != nil {
log.Fatalf("unable to connect to rabbitmq: ")
}
switch *action {
case PublisherAction:
if err := ch.Publish(
*exchangeName,
"", // routing key is not used w/ headers exchange
false,
false,
amqp.Publishing{
Headers: amqp.Table{
"type": "foo",
},
Body: []byte("test message contents"),
},
); err != nil {
log.Fatalf("unable to publish message: %s", err)
}
fmt.Println("Published message")
case ConsumerAction:
rabbitChan, err := ch.Consume(
"transform",
"consume/main.go",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("unable to start consumption of messages: %s", err)
}
fmt.Println("Running in consumer mode")
for {
msg := <-rabbitChan
fmt.Printf("Received a message w/ body: %+v Headers: %+v\n", string(msg.Body), msg.Headers)
}
default:
log.Fatalf("unrecognized action '%s'", *action)
}
}
func connect(action string, opts *Options) (*amqp.Channel, error) {
ac, err := amqp.Dial(opts.URL)
if err != nil {
return nil, err
}
ch, err := ac.Channel()
if err != nil {
return nil, errors.Wrap(err, "Channel instantiation failure")
}
if err := ch.ExchangeDeclare(
opts.ExchangeName,
opts.ExchangeType,
true,
false,
false,
false,
nil,
); err != nil {
return nil, errors.Wrap(err, "unable to declare exchange")
}
if action == ConsumerAction {
if _, err = ch.QueueDeclare(
opts.QueueName,
true,
false,
false,
false,
nil,
); err != nil {
return nil, err
}
if err := ch.QueueBind(
opts.QueueName,
opts.RoutingKey,
opts.ExchangeName,
false,
amqp.Table{
"x-match": "all",
"type": "foo",
},
); err != nil {
return nil, errors.Wrap(err, "unable to bind queue")
}
}
return ch, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment