Skip to content

Instantly share code, notes, and snippets.

@amitsaha
Forked from simpleton/consumer.go
Created December 11, 2017 06:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitsaha/410dd92e23308a2bf3360fe3a4042a8d to your computer and use it in GitHub Desktop.
Save amitsaha/410dd92e23308a2bf3360fe3a4042a8d to your computer and use it in GitHub Desktop.
Golang auto-reconnect rabbitmq consumer
package base
import (
"errors"
"fmt"
"github.com/manucorporat/try"
"github.com/simpleton/beego"
"github.com/streadway/amqp"
"math/rand"
"model/helper"
"os"
"runtime"
"time"
"sync/atomic"
)
// Consumer holds all infromation
// about the RabbitMQ connection
// This setup does limit a consumer
// to one exchange. This should not be
// an issue. Having to connect to multiple
// exchanges means something else is
// structured improperly.
type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel
done chan error
consumerTag string // Name that consumer identifies itself to the server with
uri string // uri of the rabbitmq server
exchange string // exchange that we will bind to
exchangeType string // topic, direct, etc...
lastRecoverTime int64
//track service current status
currentStatus atomic.Value
}
const RECOVER_INTERVAL_TIME = 6 * 60
// NewConsumer returns a Consumer struct that has been initialized properly
// essentially don't touch conn, channel, or done and you can create Consumer manually
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer {
name, err := os.Hostname()
if err != nil {
name = "_sim"
}
consumer := &Consumer{
consumerTag: fmt.Sprintf("%s%s", consumerTag, name),
uri: uri,
exchange: exchange,
exchangeType: exchangeType,
done: make(chan error),
lastRecoverTime: time.Now().Unix(),
}
consumer.currentStatus.Store(true)
return consumer
}
func maxParallelism() int {
maxProcs := runtime.GOMAXPROCS(0)
numCPU := runtime.NumCPU()
if maxProcs < numCPU {
return maxProcs
}
return numCPU
}
func RunConsumer(consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) {
rabbitUri := fmt.Sprintf("amqp://%s:%s@%s/",
beego.AppConfig.String("mqAccount"),
beego.AppConfig.String("mqPassword"),
beego.AppConfig.String("mqAddress"),
)
consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType)
if err := consumer.Connect(); err != nil {
helper.FailOnError(err, fmt.Sprintf("[%s]connect error", consumerTag))
}
deliveries, err := consumer.AnnounceQueue(queueName, routingKey)
helper.FailOnError(err, fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag))
consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey)
}
// ReConnect is called in places where NotifyClose() channel is called
// wait 30 seconds before trying to reconnect. Any shorter amount of time
// will likely destroy the error log while waiting for servers to come
// back online. This requires two parameters which is just to satisfy
// the AccounceQueue call and allows greater flexability
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) {
c.Close()
time.Sleep(time.Duration(15 + rand.Intn(60) + 2*retryTime) * time.Second)
beego.Info("Try ReConnect with times:", retryTime)
if err := c.Connect(); err != nil {
return nil, err
}
deliveries, err := c.AnnounceQueue(queueName, routingKey)
if err != nil {
return deliveries, errors.New("Couldn't connect")
}
return deliveries, nil
}
// Connect to RabbitMQ server
func (c *Consumer) Connect() error {
var err error
beego.Info("dialing: ", c.uri)
c.conn, err = amqp.Dial(c.uri)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
go func() {
// Waits here for the channel to be closed
beego.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error)))
// Let Handle know it's not time to reconnect
c.done <- errors.New("Channel Closed")
}()
beego.Info("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
beego.Info("got Channel, declaring Exchange ", c.exchange)
if err = c.channel.ExchangeDeclare(
c.exchange, // name of the exchange
c.exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
return nil
}
// AnnounceQueue sets the queue that will be listened to for this
// connection...
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) {
beego.Info("declared Exchange, declaring Queue:", queueName)
queue, err := c.channel.QueueDeclare(
queueName, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Declare: %s", err)
}
beego.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, routingKey))
// Qos determines the amount of messages that the queue will pass to you before
// it waits for you to ack them. This will slow down queue consumption but
// give you more certainty that all messages are being processed. As load increases
// I would reccomend upping the about of Threads and Processors the go process
// uses before changing this although you will eventually need to reach some
// balance between threads, procs, and Qos.
err = c.channel.Qos(50, 0, false)
if err != nil {
return nil, fmt.Errorf("Error setting qos: %s", err)
}
if err = c.channel.QueueBind(
queue.Name, // name of the queue
routingKey, // routingKey
c.exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Queue Bind: %s", err)
}
beego.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag)
deliveries, err := c.channel.Consume(
queue.Name, // name
c.consumerTag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
return deliveries, nil
}
func (c *Consumer) Close() {
if c.channel != nil {
c.channel.Close()
c.channel = nil
}
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}
func (c *Consumer) Handle(
deliveries <-chan amqp.Delivery,
fn func([]byte) bool,
threads int,
queue string,
routingKey string) {
var err error
for {
beego.Info("Enter for busy loop with thread:", threads)
for i := 0; i < threads; i++ {
go func() {
beego.Info("Enter go with thread with deliveries", deliveries)
for msg := range deliveries {
beego.Info("Enter deliver")
ret := false
try.This(func() {
body := msg.Body[:]
ret = fn(body)
}).Finally(func() {
if ret == true {
msg.Ack(false)
currentTime := time.Now().Unix()
if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) {
beego.Info("Try to Recover Unack Messages!")
c.currentStatus.Store(true)
c.lastRecoverTime = currentTime
c.channel.Recover(true)
}
} else {
// this really a litter dangerous. if the worker is panic very quickly,
// it will ddos our sentry server......plz, add [retry-ttl] in header.
//msg.Nack(false, true)
c.currentStatus.Store(false)
}
}).Catch(func(e try.E) {
helper.SentryError(e)
})
}
}()
}
// Go into reconnect loop when
// c.done is passed non nil values
if <-c.done != nil {
c.currentStatus.Store(false)
retryTime := 1
for {
deliveries, err = c.ReConnect(queue, routingKey, retryTime)
if err != nil {
helper.FailOnError(err, "Reconnecting Error")
retryTime += 1
} else {
break
}
}
}
beego.Info("Reconnected!!!")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment