Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
RabbitMQ client that automatically reconnects when the connection fails, and has a confirmed push method (i.e. the server is guaranteed to recieve the message)
package main
import (
"errors"
"github.com/streadway/amqp"
"log"
"os"
"time"
)
// Queue represents a connection to a specific queue.
type Queue struct {
name string
logger *log.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isConnected bool
}
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
// When resending messages the server didn't confirm
resendDelay = 5 * time.Second
)
var (
errNotConnected = errors.New("not connected to the queue")
errNotConfirmed = errors.New("message not confirmed")
errAlreadyClosed = errors.New("already closed: not connected to the queue")
)
// New creates a new queue instance, and automatically
// attempts to connect to the server.
func New(name string, addr string) *Queue {
queue := Queue{
logger: log.New(os.Stdout, "", log.LstdFlags),
name: name,
done: make(chan bool),
}
go queue.handleReconnect(addr)
return &queue
}
// handleReconnect will wait for a connection error on
// notifyClose, and then continously attempt to reconnect.
func (queue *Queue) handleReconnect(addr string) {
for {
queue.isConnected = false
log.Println("Attempting to connect")
for !queue.connect(addr) {
log.Println("Failed to connect. Retrying...")
time.Sleep(reconnectDelay)
}
select {
case <-queue.done:
return
case <-queue.notifyClose:
}
}
}
// connect will make a single attempt to connect to
// RabbitMQ. It returns the success of the attempt.
func (queue *Queue) connect(addr string) bool {
conn, err := amqp.Dial(addr)
if err != nil {
return false
}
ch, err := conn.Channel()
if err != nil {
return false
}
ch.Confirm(false)
_, err = ch.QueueDeclare(
queue.name,
false, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return false
}
queue.changeConnection(conn, ch)
queue.isConnected = true
log.Println("Connected!")
return true
}
// changeConnection takes a new connection to the queue,
// and updates the channel listeners to reflect this.
func (queue *Queue) changeConnection(connection *amqp.Connection, channel *amqp.Channel) {
queue.connection = connection
queue.channel = channel
queue.notifyClose = make(chan *amqp.Error)
queue.notifyConfirm = make(chan amqp.Confirmation)
queue.channel.NotifyClose(queue.notifyClose)
queue.channel.NotifyPublish(queue.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirm.
// If no confirms are recieved until within the resendTimeout,
// it continuously resends messages until a confirm is recieved.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (queue *Queue) Push(data []byte) error {
if !queue.isConnected {
return errors.New("failed to push push: not connected")
}
for {
err := queue.UnsafePush(data)
if err != nil {
queue.logger.Println("Push failed. Retrying...")
continue
}
select {
case confirm := <-queue.notifyConfirm:
if confirm.Ack {
queue.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
}
queue.logger.Println("Push didn't confirm. Retrying...")
}
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// recieve the message.
func (queue *Queue) UnsafePush(data []byte) error {
if !queue.isConnected {
return errNotConnected
}
return queue.channel.Publish(
"", // Exchange
queue.name, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
}
// Stream will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (queue *Queue) Stream() (<-chan amqp.Delivery, error) {
if !queue.isConnected {
return nil, errNotConnected
}
return queue.channel.Consume(
queue.name,
"", // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
)
}
// Close will cleanly shutdown the channel and connection.
func (queue *Queue) Close() error {
if !queue.isConnected {
return errAlreadyClosed
}
err := queue.channel.Close()
if err != nil {
return err
}
err = queue.connection.Close()
if err != nil {
return err
}
close(queue.done)
queue.isConnected = false
return nil
}
@harrisonturton

This comment has been minimized.

Copy link
Owner Author

@harrisonturton harrisonturton commented Nov 6, 2018

Some notes:

  • Push will block until the server responds with a confirm. This could take forever if the server is offline.
  • UnsafePush won't block, but the server might never receive the message. Fire & forget.
  • If resendDelay is too small, it might miss a confirm sent over a slow connection. This may result in duplicate messages.
  • Close must be called to gracefully stop, otherwise messages might be dropped. This also closes handleReconnect, so it must be called to avoid memory leaks.
@mychell

This comment has been minimized.

Copy link

@mychell mychell commented Feb 27, 2019

just one minor addition, I would suggest you expose a function that tells if the connection is true or false, this will make it easier to only start consuming when the queue has successfully connected.

// Connected will return true when if aqueue
// is connected or false if not
func (queue *Queue) Connected() bool {
	return queue.isConnected
}
@huangjiasingle

This comment has been minimized.

Copy link

@huangjiasingle huangjiasingle commented Jun 2, 2020

if the connect was closed, reconnect successfully, the oready call stream func do consume will not wokr well. so when reconnect completed, we should reload stream func call

@mshddev

This comment has been minimized.

Copy link

@mshddev mshddev commented Nov 27, 2020

hai @harrisonturton, first of all, thanks for sharing the code. it really useful. however, i spotted there is possibilty of race condition here, in the isConnected variable, as it accessed by goroutine handleReconnect and other func as well. i would suggest to add Mutex to address this issue.

@harrisonturton

This comment has been minimized.

Copy link
Owner Author

@harrisonturton harrisonturton commented Dec 6, 2020

@mshddev Good catch! Though isConnected is always written safely (since only one goroutine writes to it), functions like UnsafePush and Stream and Close may read stale versions of isConnected and fail to return the error response, potentially leading to lost data when it's pushed at the wrong time.

I'll try to update the code. It seems that the same bug is present in the upstream streadway/amqp repo, so I'll create an issue and tag you there.

@fho

This comment has been minimized.

Copy link

@fho fho commented Dec 8, 2020

Thanks a lot for the example.

I'm implementing an abstraction for streadway/amqp and struggle with implementing support for the confirmations.

How is it ensured in your example that when Push() is called in parallel, the notifyConfirm message is associated with the correct publishing?

As a poor timing diagram, I hope it's understandable what I mean: :-)

goroutine1-> Push()... ... queue.channel.Publish() (message has deliveryTag 2)...confirm := <-queue.notifyConfirm() (received ack for deliveryTag 1, msg from goroutine2)
goroutine2-> Push()... queue.channel.Publish() (message has deliveryTag 1)... ... confirm := <-queue.notifyConfirm() (received NACK for deliveryTag 2, msg from goroutine1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment