Skip to content

Instantly share code, notes, and snippets.

@lukebakken
Last active January 5, 2018 00:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukebakken/634e350db0ba3e9009ae4d2357652337 to your computer and use it in GitHub Desktop.
Save lukebakken/634e350db0ba3e9009ae4d2357652337 to your computer and use it in GitHub Desktop.
// This example declares a durable Exchange, an ephemeral (auto-delete) Queue,
// binds the Queue to the Exchange with a binding key, and consumes every
// message published to that Exchange with that routing key.
//
package main
import (
"flag"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
queueName = flag.String("queue-name", "test-queue", "Ephemeral AMQP queue name")
bindingKey = flag.String("key", "test-key", "AMQP binding key")
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
)
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
tag string
done chan error
}
func init() {
flag.Parse()
}
func main() {
var err error
var conn *amqp.Connection
var channel *amqp.Channel
log.Printf("dialing %q", *uri)
conn, err = amqp.Dial(*uri)
if err != nil {
log.Fatalf("Dial: %s", err)
}
log.Printf("got Connection, getting Channel")
channel, err = conn.Channel()
if err != nil {
log.Fatalf("Channel: %s", err)
}
/*
log.Printf("got Channel, setting Qos globally")
err = channel.Qos(1, 0, true)
if err != nil {
log.Fatalf("%s", err)
}
*/
log.Printf("set Qos, declaring Exchange (%q)", *exchange)
if err = channel.ExchangeDeclare(
*exchange, // name of the exchange
*exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
log.Fatalf("Exchange Declare: %s", err)
}
log.Printf("declared Exchange, declaring Queue %q", *queueName)
queue, err := channel.QueueDeclare(
*queueName, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
log.Fatalf("Queue Declare: %s", err)
}
log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, *bindingKey)
if err = channel.QueueBind(
queue.Name, // name of the queue
*bindingKey, // bindingKey
*exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
log.Fatalf("Queue Bind: %s", err)
}
var c1 *Consumer
var c2 *Consumer
log.Printf("got Channel, setting Qos non-globally for consumer-1")
err = channel.Qos(1, 0, false)
if err != nil {
log.Fatalf("%s", err)
}
c1, err = NewConsumer(conn, channel, queue.Name, "consumer-1")
if err != nil {
log.Fatalf("%s", err)
}
log.Printf("got Channel, setting Qos non-globally for consumer-2")
err = channel.Qos(2, 0, false)
if err != nil {
log.Fatalf("%s", err)
}
c2, err = NewConsumer(conn, channel, queue.Name, "consumer-2")
if err != nil {
log.Fatalf("%s", err)
}
if *lifetime > 0 {
log.Printf("running for %s", *lifetime)
time.Sleep(*lifetime)
} else {
log.Printf("running forever")
select {}
}
log.Printf("shutting down")
if err := c1.Shutdown(); err != nil {
log.Fatalf("error during shutdown: %s", err)
}
if err := c2.Shutdown(); err != nil {
log.Fatalf("error during shutdown: %s", err)
}
}
func NewConsumer(conn *amqp.Connection, channel *amqp.Channel, queueName, ctag string) (*Consumer, error) {
c := &Consumer{
conn: conn,
channel: channel,
tag: ctag,
done: make(chan error),
}
go func() {
fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
}()
log.Printf("Starting Consume (consumer tag %q)", c.tag)
deliveries, err := c.channel.Consume(
queueName, // name
c.tag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
go handle(deliveries, c.tag, c.done)
return c, nil
}
func (c *Consumer) Shutdown() error {
// will close() the deliveries channel
if err := c.channel.Cancel(c.tag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
}
if err := c.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
defer log.Printf("AMQP shutdown OK")
// wait for handle() to exit
return <-c.done
}
func handle(deliveries <-chan amqp.Delivery, ctag string, done chan error) {
for d := range deliveries {
log.Printf(
"ctag [%s] got %dB delivery: [%v] %q",
ctag,
len(d.Body),
d.DeliveryTag,
d.Body,
)
d.Ack(false)
}
log.Printf("handle: deliveries channel closed")
done <- nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment