Skip to content

Instantly share code, notes, and snippets.

@ribice

ribice/caller.go

Last active Apr 10, 2021
Embed
What would you like to do?
A robust rabbitmq client for Go
go func() {
for {
err = rmq.Stream(cancelCtx)
if errors.Is(err, rabbitmq.ErrDisconnected) {
continue
}
break
}
}()
var (
ErrDisconnected = errors.New("disconnected from rabbitmq, trying to reconnect")
)
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
// When resending messages the server didn't confirm
resendDelay = 5 * time.Second
)
// Client holds necessery information for rabbitMQ
type Client struct {
pushQueue string
streamQueue string
logger zerolog.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan os.Signal
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isConnected bool
alive bool
threads int
wg *sync.WaitGroup
}
// New is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don't exist.
func New(streamQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client {
threads := runtime.GOMAXPROCS(0)
if numCPU := runtime.NumCPU(); numCPU > threads {
threads = numCPU
}
client := Client{
logger: l,
threads: threads,
pushQueue: pushQueue,
streamQueue: streamQueue,
done: done,
alive: true,
wg: &sync.WaitGroup{},
}
client.wg.Add(threads)
go client.handleReconnect(addr)
return &client
}
// handleReconnect will wait for a connection error on
// notifyClose, and then continuously attempt to reconnect.
func (c *Client) handleReconnect(addr string) {
for c.alive {
c.isConnected = false
t := time.Now()
fmt.Printf("Attempting to connect to rabbitMQ: %s\n", addr)
var retryCount int
for !c.connect(addr) {
if !c.alive {
return
}
select {
case <-c.done:
return
case <-time.After(reconnectDelay + time.Duration(retryCount)*time.Second):
c.logger.Printf("disconnected from rabbitMQ and failed to connect")
retryCount++
}
}
c.logger.Printf("Connected to rabbitMQ in: %vms", time.Since(t).Milliseconds())
select {
case <-c.done:
return
case <-c.notifyClose:
}
}
}
// connect will make a single attempt to connect to
// RabbitMq. It returns the success of the attempt.
func (c *Client) connect(addr string) bool {
conn, err := amqp.Dial(addr)
if err != nil {
c.logger.Printf("failed to dial rabbitMQ server: %v", err)
return false
}
ch, err := conn.Channel()
if err != nil {
c.logger.Printf("failed connecting to channel: %v", err)
return false
}
ch.Confirm(false)
_, err = ch.QueueDeclare(
c.streamQueue,
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
c.logger.Printf("failed to declare stream queue: %v", err)
return false
}
_, err = ch.QueueDeclare(
c.pushQueue,
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
c.logger.Printf("failed to declare push queue: %v", err)
return false
}
c.changeConnection(conn, ch)
c.isConnected = true
return true
}
// changeConnection takes a new connection to the queue,
// and updates the channel listeners to reflect this.
func (c *Client) changeConnection(connection *amqp.Connection, channel *amqp.Channel) {
c.connection = connection
c.channel = channel
c.notifyClose = make(chan *amqp.Error)
c.notifyConfirm = make(chan amqp.Confirmation)
c.channel.NotifyClose(c.notifyClose)
c.channel.NotifyPublish(c.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirmation.
// If no confirms are received until within the resendTimeout,
// it continuously resends messages until a confirmation is received.
// This will block until the server sends a confirm.
func (c *Client) Push(data []byte) error {
if !c.isConnected {
return errors.New("failed to push push: not connected")
}
for {
err := c.UnsafePush(data)
if err != nil {
if err == ErrDisconnected {
continue
}
return err
}
select {
case confirm := <-c.notifyConfirm:
if confirm.Ack {
return nil
}
case <-time.After(resendDelay):
}
}
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (c *Client) UnsafePush(data []byte) error {
if !c.isConnected {
return ErrDisconnected
}
return c.channel.Publish(
"", // Exchange
c.pushQueue, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
}
func (c *Client) Stream(cancelCtx context.Context) error {
for {
if c.isConnected {
break
}
time.Sleep(1 * time.Second)
}
err := c.channel.Qos(1, 0, false)
if err != nil {
return err
}
var connectionDropped bool
for i := 1; i <= c.threads; i++ {
msgs, err := c.channel.Consume(
c.streamQueue,
consumerName(i), // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
)
if err != nil {
return err
}
go func() {
defer c.wg.Done()
for {
select {
case <-cancelCtx.Done():
return
case msg, ok := <-msgs:
if !ok {
connectionDropped = true
return
}
c.parseEvent(msg)
}
}
}()
}
c.wg.Wait()
if connectionDropped {
return ErrDisconnected
}
return nil
}
type event struct{
Job string `json:"job"`
Data string `json:"data"`
}
func (c *Client) parseEvent(msg amqp.Delivery) {
l := c.logger.Log().Timestamp()
startTime := time.Now()
var evt event
err := json.Unmarshal(msg.Body, &evt)
if err != nil {
logAndNack(msg, l, startTime, "unmarshalling body: %s - %s", string(msg.Body), err.Error())
return
}
if evt.Data == "" {
logAndNack(msg, l, startTime, "received event without data")
return
}
defer func(ctx context.Context, e event, m amqp.Delivery, logger *zerolog.Event) {
if err := recover(); err != nil {
stack := make([]byte, 8096)
stack = stack[:runtime.Stack(stack, false)]
l.Bytes("stack", stack).Str("level", "fatal").Interface("error", err).Msg("panic recovery for rabbitMQ message")
msg.Nack(false, false)
}
}(ctx, evt, msg, l)
switch evt.Job {
case "job1":
// Call an actual function
err = func()
case "job1":
err = func()
default:
msg.Reject(false)
return
}
if err != nil {
logAndNack(msg, l, startTime, err.Error())
return
}
l.Str("level", "info").Int64("took-ms", time.Since(startTime).Milliseconds()).Msgf("%s succeeded", evt.Job)
msg.Ack(false)
}
func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) {
msg.Nack(false, false)
l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...))
}
func (c *Client) Close() error {
if !c.isConnected {
return nil
}
c.alive = false
fmt.Println("Waiting for current messages to be processed...")
c.wg.Wait()
for i := 1; i <= c.threads; i++ {
fmt.Println("Closing consumer: ", i)
err := c.channel.Cancel(consumerName(i), false)
if err != nil {
return fmt.Errorf("error canceling consumer %s: %v", consumerName(i), err)
}
}
err := c.channel.Close()
if err != nil {
return err
}
err = c.connection.Close()
if err != nil {
return err
}
c.isConnected = false
fmt.Println("gracefully stopped rabbitMQ connection")
return nil
}
func consumerName(i int) string {
return fmt.Sprintf("go-consumer-%v", i)
}
@kamal-github

This comment has been minimized.

Copy link

@kamal-github kamal-github commented Sep 26, 2020

I believe, For Push() L122 you are returning on connection error and if the connection failure happens right after L122, then we are waiting until the message gets published and confirmed. so Sometime it will throw an error, and other times it will reconnect and publish successfully eventually.

@lukaszraczylo

This comment has been minimized.

Copy link

@lukaszraczylo lukaszraczylo commented Oct 11, 2020

What's the q in L#55?

@ribice

This comment has been minimized.

Copy link
Owner Author

@ribice ribice commented Oct 11, 2020

What's the q in L#55?

Thanks for noticing, fixed this! Receiver in my actual code is named q and not c, thus this typo.

@s0j0hn

This comment has been minimized.

Copy link

@s0j0hn s0j0hn commented Oct 18, 2020

Wow, your code looks great. I am starting to use go for my personal projects with rest api and i was looking for more in depth proof of concepts

@s0j0hn

This comment has been minimized.

Copy link

@s0j0hn s0j0hn commented Oct 21, 2020

@ribice Can you look at this please ? I do not understand you hadlereconnect logic. How do i properly instatie my client so it ca be used any time i need ? https://gist.github.com/s0j0hn/06492e218e51bfd67421450aa66c586c

	goChan := make(chan os.Signal, 1)
	clientV2 := V2.New("listenqueue", "pushqueue", config.GetRabbitMQAccess(), log.Logger, goChan)

	go func() {
		for {
			select {
			case <- goChan:
				taskBytes := rabbitmq.CreateNewTaskV2([]string{"test", "test2"}, "Status is OK")
				err = clientV2.UnsafePush(taskBytes)
				if err != nil {
					echoServer.Logger.Fatal(err)
					os.Exit(1)
				}
				return
			default:
				// Do other stuff
			}
		}
	}()

EDIT: I updated my version and everything works

@nenadvasic

This comment has been minimized.

Copy link

@nenadvasic nenadvasic commented Nov 16, 2020

@ribice, Thank you very much for the example. I would like just to add a note about the following error which could occur after reconnecting to a RabbitMQ server:

panic: sync: negative WaitGroup counter

When the connection drops, c.wg.Done() on L222 will decrease the counter in WaitGroup to 0. After reconnecting to the server, the counter remains 0 because there isn't a c.wg.Add() call to increase it. When the connection drops again or you try to shut down the application, c.wg.Done() is called again and panic occurs.

I have managed to fix the problem by deleting client.wg.Add(threads) from L40 and putting c.wg.Add(1) before calling the goroutine in Stream() function.

Cheers 🍻

@sundowndev

This comment has been minimized.

Copy link

@sundowndev sundowndev commented Nov 24, 2020

@mshddev

This comment has been minimized.

Copy link

@mshddev mshddev commented Nov 24, 2020

hai, where is this c.streamQueue come from?

@ribice

This comment has been minimized.

Copy link
Owner Author

@ribice ribice commented Dec 2, 2020

Thanks for reporting @sundowndev and @mshddev, I fixed both issues.

@dikkini

This comment has been minimized.

@ribice

This comment has been minimized.

Copy link
Owner Author

@ribice ribice commented Feb 10, 2021

@dikkini: Thanks, addressed those.

@Future2100

This comment has been minimized.

Copy link

@Future2100 Future2100 commented Apr 9, 2021

What is recover() at L259?

another thing is that can we use the raw type to communicate between the goroutine? I mean the alive and isConnected

@ribice

This comment has been minimized.

Copy link
Owner Author

@ribice ribice commented Apr 10, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment