Skip to content

Instantly share code, notes, and snippets.

Forked from ribice/caller.go
Created September 17, 2020 16:54
Show Gist options
  • Save kamal-github/dbdb78d6fab7f284d7ff2b3d20afbe14 to your computer and use it in GitHub Desktop.
Save kamal-github/dbdb78d6fab7f284d7ff2b3d20afbe14 to your computer and use it in GitHub Desktop.
A robust rabbitmq client for Go
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
// Client holds necessery information for rabbitMQ
type Client struct {
pushQueue string
logger zerolog.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan os.Signal
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isConnected bool
alive bool
threads int
wg *sync.WaitGroup
// New is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don't exist.
func New(listenQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client {
threads := runtime.GOMAXPROCS(0)
if numCPU := runtime.NumCPU(); numCPU > threads {
threads = numCPU
client := Client{
logger: l,
threads: threads,
pushQueue: pushQueue,
done: done,
alive: true,
wg: &sync.WaitGroup{},
go client.handleReconnect(addr)
return &client
// handleReconnect will wait for a connection error on
// notifyClose, and then continuously attempt to reconnect.
func (c *Client) handleReconnect(listenQueue, addr string) {
for c.alive {
c.isConnected = false
fmt.Printf("Attempting to connect to rabbitMQ: %s\n", addr)
var retryCount int
for !c.connect(listenQueue, addr) {
c.logger.Printf("disconnected from rabbitMQ and failed to connect")
time.Sleep(reconnectDelay + time.Duration(retryCount)*time.Second)
select {
case <-c.done:
case <-c.notifyClose:
// connect will make a single attempt to connect to
// RabbitMq. It returns the success of the attempt.
func (c *Client) connect(listenQueue, addr string) bool {
conn, err := amqp.Dial(addr)
if err != nil {
c.logger.Printf("failed to dial rabbitMQ server: %v", err)
return false
ch, err := conn.Channel()
if err != nil {
c.logger.Printf("failed connecting to channel: %v", err)
return false
_, err = ch.QueueDeclare(
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
if err != nil {
c.logger.Printf("failed to declare listen queue: %v", err)
return false
_, err = ch.QueueDeclare(
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
if err != nil {
c.logger.Printf("failed to declare push queue: %v", err)
return false
c.changeConnection(conn, ch)
c.isConnected = true
return true
// changeConnection takes a new connection to the queue,
// and updates the channel listeners to reflect this.
func (c *Client) changeConnection(connection *amqp.Connection, channel *amqp.Channel) {
c.connection = connection = channel
c.notifyClose = make(chan *amqp.Error)
c.notifyConfirm = make(chan amqp.Confirmation)
// Push will push data onto the queue, and wait for a confirmation.
// If no confirms are received until within the resendTimeout,
// it continuously resends messages until a confirmation is received.
// This will block until the server sends a confirm.
func (c *Client) Push(data []byte) error {
if !c.isConnected {
return errors.New("failed to push push: not connected")
for {
err := c.UnsafePush(data)
if err != nil {
select {
case confirm := <-c.notifyConfirm:
if confirm.Ack {
return nil
case <-time.After(resendDelay):
func (c *Client) UnsafePush(data []byte) error {
if !c.isConnected {
return errNotConnected
"", // Exchange, // Routing key
false, // Mandatory
false, // Immediate
ContentType: "text/plain",
Body: data,
func (c *Client) Stream(cancelCtx context.Context) error {
for {
if c.isConnected {
time.Sleep(1 * time.Second)
err :=, 0, false)
if err != nil {
return err
for i := 1; i <= c.threads; i++ {
msgs, err :=
consumerName(i), // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
if err != nil {
return err
go func() {
defer c.wg.Done()
for {
select {
case <-cancelCtx.Done():
case msg := <-msgs:
return nil
type event struct{
Job string `json:"job"`
Data string `json:"data"`
func (c *Client) parseEvent(msg amqp.Delivery) {
l := c.logger.Log().Timestamp()
startTime := time.Now()
var evt event
err := json.Unmarshal(msg.Body, &evt)
if err != nil {
logAndNack(msg, l, startTime, "unmarshalling body: %s - %s", string(msg.Body), err.Error())
if evt.Data == "" {
logAndNack(msg, l, startTime, "received event without data")
defer func(ctx context.Context, e event, m amqp.Delivery, logger *zerolog.Event) {
if err := recover(); err != nil {
stack := make([]byte, 8096)
stack = stack[:runtime.Stack(stack, false)]
l.Bytes("stack", stack).Str("level", "fatal").Interface("error", err).Msg("panic recovery for rabbitMQ message")
msg.Nack(false, false)
}(ctx, evt, msg, l)
switch evt.Job {
case "job1":
// Call an actual function
err = func()
case "job1":
err = func()
if err != nil {
logAndNack(msg, l, startTime, err.Error())
l.Str("level", "info").Int64("took-ms", time.Since(startTime).Milliseconds()).Msgf("%s succeeded" evt.Job)
func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) {
msg.Nack(false, false)
l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...))
func (c *Client) Close() error {
if !c.isConnected {
return errAlreadyClosed
c.alive = false
fmt.Println("Waiting for current messages to be processed...")
for i := 1; i <= c.threads; i++ {
fmt.Println("Closing consumer: ", i)
err :=, false)
if err != nil {
return fmt.Errorf("error canceling consumer %s: %v", consumerName(i), err)
err :=
if err != nil {
return err
err = c.connection.Close()
if err != nil {
return err
c.isConnected = false
fmt.Println("gracefully stopped rabbitMQ connection")
return nil
func consumerName(i int) string {
return fmt.Sprintf("go-consumer-%v", i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment