Last active
February 27, 2018 15:55
-
-
Save lukebakken/4c390610ed44594d5be5450373a54504 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This example declares a durable Exchange, an ephemeral (auto-delete) Queue, | |
// binds the Queue to the Exchange with a binding key, and consumes every | |
// message published to that Exchange with that routing key. | |
// | |
package main | |
import ( | |
"flag" | |
"fmt" | |
"github.com/streadway/amqp" | |
"log" | |
"time" | |
) | |
var ( | |
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") | |
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name") | |
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") | |
queue = flag.String("queue", "test-queue", "AMQP queue name") | |
bindingKey = flag.String("key", "test-key", "AMQP binding key") | |
consumerTag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)") | |
autoDelete = flag.Bool("auto-delete", false, "Should queue be auto-delete or not (default is false)") | |
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)") | |
) | |
func init() { | |
flag.Parse() | |
} | |
func main() { | |
c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag, *autoDelete) | |
if err != nil { | |
log.Fatalf("%s", err) | |
} | |
if *lifetime > 0 { | |
log.Printf("running for %s", *lifetime) | |
time.Sleep(*lifetime) | |
} else { | |
log.Printf("running forever") | |
select {} | |
} | |
log.Printf("shutting down") | |
if err := c.Shutdown(); err != nil { | |
log.Fatalf("error during shutdown: %s", err) | |
} | |
} | |
type Consumer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
tag string | |
done chan error | |
} | |
func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string, autoDelete bool) (*Consumer, error) { | |
c := &Consumer{ | |
conn: nil, | |
channel: nil, | |
tag: ctag, | |
done: make(chan error), | |
} | |
var err error | |
log.Printf("dialing %q", amqpURI) | |
c.conn, err = amqp.Dial(amqpURI) | |
if err != nil { | |
return nil, fmt.Errorf("Dial: %s", err) | |
} | |
go func() { | |
fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error))) | |
}() | |
log.Printf("got Connection, getting Channel") | |
c.channel, err = c.conn.Channel() | |
if err != nil { | |
return nil, fmt.Errorf("Channel: %s", err) | |
} | |
log.Printf("got Channel, declaring Exchange (%q)", exchange) | |
if err = c.channel.ExchangeDeclare( | |
exchange, // name of the exchange | |
exchangeType, // type | |
true, // durable | |
false, // delete when complete | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return nil, fmt.Errorf("Exchange Declare: %s", err) | |
} | |
log.Printf("declared Exchange, declaring Queue %q", queueName) | |
args := make(amqp.Table) | |
args["x-expires"] = int32(10000) | |
queue, err := c.channel.QueueDeclare( | |
queueName, // name of the queue | |
true, // durable | |
autoDelete, // delete when unused | |
false, // exclusive | |
false, // noWait | |
args, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Declare: %s", err) | |
} | |
log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)", | |
queue.Name, queue.Messages, queue.Consumers, key) | |
if err = c.channel.QueueBind( | |
queue.Name, // name of the queue | |
key, // bindingKey | |
exchange, // sourceExchange | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return nil, fmt.Errorf("Queue Bind: %s", err) | |
} | |
log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag) | |
deliveries, err := c.channel.Consume( | |
queue.Name, // name | |
c.tag, // consumerTag, | |
false, // noAck | |
false, // exclusive | |
false, // noLocal | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Consume: %s", err) | |
} | |
go handle(deliveries, c.done) | |
return c, nil | |
} | |
func (c *Consumer) Shutdown() error { | |
// will close() the deliveries channel | |
if err := c.channel.Cancel(c.tag, true); err != nil { | |
return fmt.Errorf("Consumer cancel failed: %s", err) | |
} | |
if err := c.conn.Close(); err != nil { | |
return fmt.Errorf("AMQP connection close error: %s", err) | |
} | |
defer log.Printf("AMQP shutdown OK") | |
// wait for handle() to exit | |
return <-c.done | |
} | |
func handle(deliveries <-chan amqp.Delivery, done chan error) { | |
for d := range deliveries { | |
log.Printf( | |
"got %dB delivery: [%v] %q", | |
len(d.Body), | |
d.DeliveryTag, | |
d.Body, | |
) | |
d.Ack(false) | |
} | |
log.Printf("handle: deliveries channel closed") | |
done <- nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment