Last active
January 5, 2018 00:43
-
-
Save lukebakken/634e350db0ba3e9009ae4d2357652337 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This example declares a durable Exchange, an ephemeral (auto-delete) Queue, | |
// binds the Queue to the Exchange with a binding key, and consumes every | |
// message published to that Exchange with that routing key. | |
// | |
package main | |
import ( | |
"flag" | |
"fmt" | |
"github.com/streadway/amqp" | |
"log" | |
"time" | |
) | |
var ( | |
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") | |
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name") | |
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") | |
queueName = flag.String("queue-name", "test-queue", "Ephemeral AMQP queue name") | |
bindingKey = flag.String("key", "test-key", "AMQP binding key") | |
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)") | |
) | |
type Consumer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
tag string | |
done chan error | |
} | |
func init() { | |
flag.Parse() | |
} | |
func main() { | |
var err error | |
var conn *amqp.Connection | |
var channel *amqp.Channel | |
log.Printf("dialing %q", *uri) | |
conn, err = amqp.Dial(*uri) | |
if err != nil { | |
log.Fatalf("Dial: %s", err) | |
} | |
log.Printf("got Connection, getting Channel") | |
channel, err = conn.Channel() | |
if err != nil { | |
log.Fatalf("Channel: %s", err) | |
} | |
/* | |
log.Printf("got Channel, setting Qos globally") | |
err = channel.Qos(1, 0, true) | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
*/ | |
log.Printf("set Qos, declaring Exchange (%q)", *exchange) | |
if err = channel.ExchangeDeclare( | |
*exchange, // name of the exchange | |
*exchangeType, // type | |
true, // durable | |
false, // delete when complete | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
log.Fatalf("Exchange Declare: %s", err) | |
} | |
log.Printf("declared Exchange, declaring Queue %q", *queueName) | |
queue, err := channel.QueueDeclare( | |
*queueName, // name of the queue | |
true, // durable | |
false, // delete when unused | |
false, // exclusive | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
log.Fatalf("Queue Declare: %s", err) | |
} | |
log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)", | |
queue.Name, queue.Messages, queue.Consumers, *bindingKey) | |
if err = channel.QueueBind( | |
queue.Name, // name of the queue | |
*bindingKey, // bindingKey | |
*exchange, // sourceExchange | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
log.Fatalf("Queue Bind: %s", err) | |
} | |
var c1 *Consumer | |
var c2 *Consumer | |
log.Printf("got Channel, setting Qos non-globally for consumer-1") | |
err = channel.Qos(1, 0, false) | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
c1, err = NewConsumer(conn, channel, queue.Name, "consumer-1") | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
log.Printf("got Channel, setting Qos non-globally for consumer-2") | |
err = channel.Qos(2, 0, false) | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
c2, err = NewConsumer(conn, channel, queue.Name, "consumer-2") | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
if *lifetime > 0 { | |
log.Printf("running for %s", *lifetime) | |
time.Sleep(*lifetime) | |
} else { | |
log.Printf("running forever") | |
select {} | |
} | |
log.Printf("shutting down") | |
if err := c1.Shutdown(); err != nil { | |
log.Fatalf("error during shutdown: %s", err) | |
} | |
if err := c2.Shutdown(); err != nil { | |
log.Fatalf("error during shutdown: %s", err) | |
} | |
} | |
func NewConsumer(conn *amqp.Connection, channel *amqp.Channel, queueName, ctag string) (*Consumer, error) { | |
c := &Consumer{ | |
conn: conn, | |
channel: channel, | |
tag: ctag, | |
done: make(chan error), | |
} | |
go func() { | |
fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error))) | |
}() | |
log.Printf("Starting Consume (consumer tag %q)", c.tag) | |
deliveries, err := c.channel.Consume( | |
queueName, // name | |
c.tag, // consumerTag, | |
false, // noAck | |
false, // exclusive | |
false, // noLocal | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Consume: %s", err) | |
} | |
go handle(deliveries, c.tag, c.done) | |
return c, nil | |
} | |
func (c *Consumer) Shutdown() error { | |
// will close() the deliveries channel | |
if err := c.channel.Cancel(c.tag, true); err != nil { | |
return fmt.Errorf("Consumer cancel failed: %s", err) | |
} | |
if err := c.conn.Close(); err != nil { | |
return fmt.Errorf("AMQP connection close error: %s", err) | |
} | |
defer log.Printf("AMQP shutdown OK") | |
// wait for handle() to exit | |
return <-c.done | |
} | |
func handle(deliveries <-chan amqp.Delivery, ctag string, done chan error) { | |
for d := range deliveries { | |
log.Printf( | |
"ctag [%s] got %dB delivery: [%v] %q", | |
ctag, | |
len(d.Body), | |
d.DeliveryTag, | |
d.Body, | |
) | |
d.Ack(false) | |
} | |
log.Printf("handle: deliveries channel closed") | |
done <- nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment