Skip to content

Instantly share code, notes, and snippets.

@kamal-github
Forked from tomekbielaszewski/main.go
Created September 13, 2020 14:40
Show Gist options
  • Save kamal-github/22154f7c6f4f192647665a1de6d4de6d to your computer and use it in GitHub Desktop.
Save kamal-github/22154f7c6f4f192647665a1de6d4de6d to your computer and use it in GitHub Desktop.
Example of RabbitMQ reconnect feature. Including recovering already registered consumers.
package main
import (
"fmt"
"log"
"time"
)
func main() {
queue := NewQueue("amqp://guest:guest@localhost:5672/", "hello")
defer queue.Close()
queue.Consume(func(i string) {
log.Printf("Received message with second consumer: %s", i)
})
queue.Consume(func(i string) {
log.Printf("Received message with first consumer: %s", i)
})
for i := 0; i < 100; i++ {
log.Println("Sending message...")
queue.Send(fmt.Sprint("dupa", i))
time.Sleep(500 * time.Millisecond)
}
}
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
type queue struct {
url string
name string
errorChannel chan *amqp.Error
connection *amqp.Connection
channel *amqp.Channel
closed bool
consumers []messageConsumer
}
type messageConsumer func(string)
func NewQueue(url string, qName string) *queue {
q := new(queue)
q.url = url
q.name = qName
q.consumers = make([]messageConsumer, 0)
q.connect()
go q.reconnector()
return q
}
func (q *queue) Send(message string) {
err := q.channel.Publish(
"", // exchange
q.name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
logError("Sending message to queue failed", err)
}
func (q *queue) Consume(consumer messageConsumer) {
log.Println("Registering consumer...")
deliveries, err := q.registerQueueConsumer()
log.Println("Consumer registered! Processing messages...")
q.executeMessageConsumer(err, consumer, deliveries, false)
}
func (q *queue) Close() {
log.Println("Closing connection")
q.closed = true
q.channel.Close()
q.connection.Close()
}
func (q *queue) reconnector() {
for {
err := <-q.errorChannel
if !q.closed {
logError("Reconnecting after connection closed", err)
q.connect()
q.recoverConsumers()
}
}
}
func (q *queue) connect() {
for {
log.Printf("Connecting to rabbitmq on %s\n", q.url)
conn, err := amqp.Dial(q.url)
if err == nil {
q.connection = conn
q.errorChannel = make(chan *amqp.Error)
q.connection.NotifyClose(q.errorChannel)
log.Println("Connection established!")
q.openChannel()
q.declareQueue()
return
}
logError("Connection to rabbitmq failed. Retrying in 1 sec... ", err)
time.Sleep(1000 * time.Millisecond)
}
}
func (q *queue) declareQueue() {
_, err := q.channel.QueueDeclare(
q.name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logError("Queue declaration failed", err)
}
func (q *queue) openChannel() {
channel, err := q.connection.Channel()
logError("Opening channel failed", err)
q.channel = channel
}
func (q *queue) registerQueueConsumer() (<-chan amqp.Delivery, error) {
msgs, err := q.channel.Consume(
q.name, // queue
"", // messageConsumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
logError("Consuming messages from queue failed", err)
return msgs, err
}
func (q *queue) executeMessageConsumer(err error, consumer messageConsumer, deliveries <-chan amqp.Delivery, isRecovery bool) {
if err == nil {
if !isRecovery {
q.consumers = append(q.consumers, consumer)
}
go func() {
for delivery := range deliveries {
consumer(string(delivery.Body[:]))
}
}()
}
}
func (q *queue) recoverConsumers() {
for i := range q.consumers {
var consumer = q.consumers[i]
log.Println("Recovering consumer...")
msgs, err := q.registerQueueConsumer()
log.Println("Consumer recovered! Continuing message processing...")
q.executeMessageConsumer(err, consumer, msgs, true)
}
}
func logError(message string, err error) {
if err != nil {
log.Printf("%s: %s", message, err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment