Skip to content

Instantly share code, notes, and snippets.

@simpleton
Last active October 19, 2023 05:17
Show Gist options
  • Star 28 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save simpleton/9a2d4fde5c73af8f3f453df9747a86e5 to your computer and use it in GitHub Desktop.
Save simpleton/9a2d4fde5c73af8f3f453df9747a86e5 to your computer and use it in GitHub Desktop.
Golang auto-reconnect rabbitmq consumer
package base
import (
"errors"
"fmt"
"github.com/manucorporat/try"
"github.com/simpleton/beego"
"github.com/streadway/amqp"
"math/rand"
"model/helper"
"os"
"runtime"
"time"
"sync/atomic"
)
// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
done chan error
consumerTag string // Name that consumer identifies itself to the server with
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
lastRecoverTime int64
//track service current status
currentStatus atomic.Value
}
const RECOVER_INTERVAL_TIME = 6 * 60
// NewConsumer returns a Consumer struct that has been initialized properly
// essentially don't touch conn, channel, or done and you can create Consumer manually
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer {
name, err := os.Hostname()
if err != nil {
name = "_sim"
}
consumer := &Consumer{
consumerTag: fmt.Sprintf("%s%s", consumerTag, name),
uri: uri,
exchange: exchange,
exchangeType: exchangeType,
done: make(chan error),
lastRecoverTime: time.Now().Unix(),
}
consumer.currentStatus.Store(true)
return consumer
}
func maxParallelism() int {
maxProcs := runtime.GOMAXPROCS(0)
numCPU := runtime.NumCPU()
if maxProcs < numCPU {
return maxProcs
}
return numCPU
}
func RunConsumer(consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) {
rabbitUri := fmt.Sprintf("amqp://%s:%s@%s/",
beego.AppConfig.String("mqAccount"),
beego.AppConfig.String("mqPassword"),
beego.AppConfig.String("mqAddress"),
)
consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType)
if err := consumer.Connect(); err != nil {
helper.FailOnError(err, fmt.Sprintf("[%s]connect error", consumerTag))
}
deliveries, err := consumer.AnnounceQueue(queueName, routingKey)
helper.FailOnError(err, fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag))
consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey)
}
// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) {
c.Close()
time.Sleep(time.Duration(15 + rand.Intn(60) + 2*retryTime) * time.Second)
beego.Info("Try ReConnect with times:", retryTime)
if err := c.Connect(); err != nil {
return nil, err
}
deliveries, err := c.AnnounceQueue(queueName, routingKey)
if err != nil {
return deliveries, errors.New("Couldn't connect")
}
return deliveries, nil
}
// Connect to RabbitMQ server
func (c *Consumer) Connect() error {
var err error
beego.Info("dialing: ", c.uri)
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
go func() {
// Waits here for the channel to be closed
beego.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error)))
// Let Handle know it's not time to reconnect
c.done <- errors.New("Channel Closed")
}()
beego.Info("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
beego.Info("got Channel, declaring Exchange ", c.exchange)
if err = c.channel.ExchangeDeclare(
c.exchange, // name of the exchange
c.exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
return nil
}
// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) {
beego.Info("declared Exchange, declaring Queue:", queueName)
queue, err := c.channel.QueueDeclare(
queueName, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Declare: %s", err)
}
beego.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, routingKey))
// Qos determines the amount of messages that the queue will pass to you before
// it waits for you to ack them. This will slow down queue consumption but
// give you more certainty that all messages are being processed. As load increases
// I would reccomend upping the about of Threads and Processors the go process
// uses before changing this although you will eventually need to reach some
// balance between threads, procs, and Qos.
err = c.channel.Qos(50, 0, false)
if err != nil {
return nil, fmt.Errorf("Error setting qos: %s", err)
}
if err = c.channel.QueueBind(
queue.Name, // name of the queue
routingKey, // routingKey
c.exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Queue Bind: %s", err)
}
beego.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag)
deliveries, err := c.channel.Consume(
queue.Name, // name
c.consumerTag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
return deliveries, nil
}
func (c *Consumer) Close() {
if c.channel != nil {
c.channel.Close()
c.channel = nil
}
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}
func (c *Consumer) Handle(
deliveries <-chan amqp.Delivery,
fn func([]byte) bool,
threads int,
queue string,
routingKey string) {
var err error
for {
beego.Info("Enter for busy loop with thread:", threads)
for i := 0; i < threads; i++ {
go func() {
beego.Info("Enter go with thread with deliveries", deliveries)
for msg := range deliveries {
beego.Info("Enter deliver")
ret := false
try.This(func() {
body := msg.Body[:]
ret = fn(body)
}).Finally(func() {
if ret == true {
msg.Ack(false)
currentTime := time.Now().Unix()
if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) {
beego.Info("Try to Recover Unack Messages!")
c.currentStatus.Store(true)
c.lastRecoverTime = currentTime
c.channel.Recover(true)
}
} else {
// this really a litter dangerous. if the worker is panic very quickly,
// it will ddos our sentry server......plz, add [retry-ttl] in header.
//msg.Nack(false, true)
c.currentStatus.Store(false)
}
}).Catch(func(e try.E) {
helper.SentryError(e)
})
}
}()
}
// Go into reconnect loop when
// c.done is passed non nil values
if <-c.done != nil {
c.currentStatus.Store(false)
retryTime := 1
for {
deliveries, err = c.ReConnect(queue, routingKey, retryTime)
if err != nil {
helper.FailOnError(err, "Reconnecting Error")
retryTime += 1
} else {
break
}
}
}
beego.Info("Reconnected!!!")
}
}
@harrisonturton
Copy link

Unstopped goroutines? That's a memory leak ;)

I wrote a similar solution that automatically reconnects and confirms all publishes. It doesn't automatically ack the messages (I didn't need that functionality), but that could be easily handled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment