-
-
Save ribice/20951bd1c84d714ff2476465c0c0653f to your computer and use it in GitHub Desktop.
go func() { | |
for { | |
err = rmq.Stream(cancelCtx) | |
if errors.Is(err, rabbitmq.ErrDisconnected) { | |
continue | |
} | |
break | |
} | |
}() |
var ( | |
ErrDisconnected = errors.New("disconnected from rabbitmq, trying to reconnect") | |
) | |
const ( | |
// When reconnecting to the server after connection failure | |
reconnectDelay = 5 * time.Second | |
// When resending messages the server didn't confirm | |
resendDelay = 5 * time.Second | |
) | |
// Client holds necessery information for rabbitMQ | |
type Client struct { | |
pushQueue string | |
streamQueue string | |
logger zerolog.Logger | |
connection *amqp.Connection | |
channel *amqp.Channel | |
done chan os.Signal | |
notifyClose chan *amqp.Error | |
notifyConfirm chan amqp.Confirmation | |
isConnected bool | |
alive bool | |
threads int | |
wg *sync.WaitGroup | |
} | |
// New is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don't exist. | |
func New(streamQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client { | |
threads := runtime.GOMAXPROCS(0) | |
if numCPU := runtime.NumCPU(); numCPU > threads { | |
threads = numCPU | |
} | |
client := Client{ | |
logger: l, | |
threads: threads, | |
pushQueue: pushQueue, | |
streamQueue: streamQueue, | |
done: done, | |
alive: true, | |
wg: &sync.WaitGroup{}, | |
} | |
go client.handleReconnect(addr) | |
return &client | |
} | |
// handleReconnect will wait for a connection error on | |
// notifyClose, and then continuously attempt to reconnect. | |
func (c *Client) handleReconnect(addr string) { | |
for c.alive { | |
c.isConnected = false | |
t := time.Now() | |
fmt.Printf("Attempting to connect to rabbitMQ: %s\n", addr) | |
var retryCount int | |
for !c.connect(addr) { | |
if !c.alive { | |
return | |
} | |
select { | |
case <-c.done: | |
return | |
case <-time.After(reconnectDelay + time.Duration(retryCount)*time.Second): | |
c.logger.Printf("disconnected from rabbitMQ and failed to connect") | |
retryCount++ | |
} | |
} | |
c.logger.Printf("Connected to rabbitMQ in: %vms", time.Since(t).Milliseconds()) | |
select { | |
case <-c.done: | |
return | |
case <-c.notifyClose: | |
} | |
} | |
} | |
// connect will make a single attempt to connect to | |
// RabbitMq. It returns the success of the attempt. | |
func (c *Client) connect(addr string) bool { | |
conn, err := amqp.Dial(addr) | |
if err != nil { | |
c.logger.Printf("failed to dial rabbitMQ server: %v", err) | |
return false | |
} | |
ch, err := conn.Channel() | |
if err != nil { | |
c.logger.Printf("failed connecting to channel: %v", err) | |
return false | |
} | |
ch.Confirm(false) | |
_, err = ch.QueueDeclare( | |
c.streamQueue, | |
true, // Durable | |
false, // Delete when unused | |
false, // Exclusive | |
false, // No-wait | |
nil, // Arguments | |
) | |
if err != nil { | |
c.logger.Printf("failed to declare stream queue: %v", err) | |
return false | |
} | |
_, err = ch.QueueDeclare( | |
c.pushQueue, | |
true, // Durable | |
false, // Delete when unused | |
false, // Exclusive | |
false, // No-wait | |
nil, // Arguments | |
) | |
if err != nil { | |
c.logger.Printf("failed to declare push queue: %v", err) | |
return false | |
} | |
c.changeConnection(conn, ch) | |
c.isConnected = true | |
return true | |
} | |
// changeConnection takes a new connection to the queue, | |
// and updates the channel listeners to reflect this. | |
func (c *Client) changeConnection(connection *amqp.Connection, channel *amqp.Channel) { | |
c.connection = connection | |
c.channel = channel | |
c.notifyClose = make(chan *amqp.Error) | |
c.notifyConfirm = make(chan amqp.Confirmation) | |
c.channel.NotifyClose(c.notifyClose) | |
c.channel.NotifyPublish(c.notifyConfirm) | |
} | |
// Push will push data onto the queue, and wait for a confirmation. | |
// If no confirms are received until within the resendTimeout, | |
// it continuously resends messages until a confirmation is received. | |
// This will block until the server sends a confirm. | |
func (c *Client) Push(data []byte) error { | |
if !c.isConnected { | |
return errors.New("failed to push push: not connected") | |
} | |
for { | |
err := c.UnsafePush(data) | |
if err != nil { | |
if err == ErrDisconnected { | |
continue | |
} | |
return err | |
} | |
select { | |
case confirm := <-c.notifyConfirm: | |
if confirm.Ack { | |
return nil | |
} | |
case <-time.After(resendDelay): | |
} | |
} | |
} | |
// UnsafePush will push to the queue without checking for | |
// confirmation. It returns an error if it fails to connect. | |
// No guarantees are provided for whether the server will | |
// receive the message. | |
func (c *Client) UnsafePush(data []byte) error { | |
if !c.isConnected { | |
return ErrDisconnected | |
} | |
return c.channel.Publish( | |
"", // Exchange | |
c.pushQueue, // Routing key | |
false, // Mandatory | |
false, // Immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: data, | |
}, | |
) | |
} | |
func (c *Client) Stream(cancelCtx context.Context) error { | |
c.wg.Add(threads) | |
for { | |
if c.isConnected { | |
break | |
} | |
time.Sleep(1 * time.Second) | |
} | |
err := c.channel.Qos(1, 0, false) | |
if err != nil { | |
return err | |
} | |
var connectionDropped bool | |
for i := 1; i <= c.threads; i++ { | |
msgs, err := c.channel.Consume( | |
c.streamQueue, | |
consumerName(i), // Consumer | |
false, // Auto-Ack | |
false, // Exclusive | |
false, // No-local | |
false, // No-Wait | |
nil, // Args | |
) | |
if err != nil { | |
return err | |
} | |
go func() { | |
defer c.wg.Done() | |
for { | |
select { | |
case <-cancelCtx.Done(): | |
return | |
case msg, ok := <-msgs: | |
if !ok { | |
connectionDropped = true | |
return | |
} | |
c.parseEvent(msg) | |
} | |
} | |
}() | |
} | |
c.wg.Wait() | |
if connectionDropped { | |
return ErrDisconnected | |
} | |
return nil | |
} | |
type event struct{ | |
Job string `json:"job"` | |
Data string `json:"data"` | |
} | |
func (c *Client) parseEvent(msg amqp.Delivery) { | |
l := c.logger.Log().Timestamp() | |
startTime := time.Now() | |
var evt event | |
err := json.Unmarshal(msg.Body, &evt) | |
if err != nil { | |
logAndNack(msg, l, startTime, "unmarshalling body: %s - %s", string(msg.Body), err.Error()) | |
return | |
} | |
if evt.Data == "" { | |
logAndNack(msg, l, startTime, "received event without data") | |
return | |
} | |
defer func(ctx context.Context, e event, m amqp.Delivery, logger *zerolog.Event) { | |
if err := recover(); err != nil { | |
stack := make([]byte, 8096) | |
stack = stack[:runtime.Stack(stack, false)] | |
l.Bytes("stack", stack).Str("level", "fatal").Interface("error", err).Msg("panic recovery for rabbitMQ message") | |
msg.Nack(false, false) | |
} | |
}(ctx, evt, msg, l) | |
switch evt.Job { | |
case "job1": | |
// Call an actual function | |
err = func() | |
case "job1": | |
err = func() | |
default: | |
msg.Reject(false) | |
return | |
} | |
if err != nil { | |
logAndNack(msg, l, startTime, err.Error()) | |
return | |
} | |
l.Str("level", "info").Int64("took-ms", time.Since(startTime).Milliseconds()).Msgf("%s succeeded", evt.Job) | |
msg.Ack(false) | |
} | |
func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) { | |
msg.Nack(false, false) | |
l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...)) | |
} | |
func (c *Client) Close() error { | |
if !c.isConnected { | |
return nil | |
} | |
c.alive = false | |
fmt.Println("Waiting for current messages to be processed...") | |
c.wg.Wait() | |
for i := 1; i <= c.threads; i++ { | |
fmt.Println("Closing consumer: ", i) | |
err := c.channel.Cancel(consumerName(i), false) | |
if err != nil { | |
return fmt.Errorf("error canceling consumer %s: %v", consumerName(i), err) | |
} | |
} | |
err := c.channel.Close() | |
if err != nil { | |
return err | |
} | |
err = c.connection.Close() | |
if err != nil { | |
return err | |
} | |
c.isConnected = false | |
fmt.Println("gracefully stopped rabbitMQ connection") | |
return nil | |
} | |
func consumerName(i int) string { | |
return fmt.Sprintf("go-consumer-%v", i) | |
} |
Wow, your code looks great. I am starting to use go for my personal projects with rest api and i was looking for more in depth proof of concepts
@ribice Can you look at this please ? I do not understand you hadlereconnect logic. How do i properly instatie my client so it ca be used any time i need ? https://gist.github.com/s0j0hn/06492e218e51bfd67421450aa66c586c
goChan := make(chan os.Signal, 1)
clientV2 := V2.New("listenqueue", "pushqueue", config.GetRabbitMQAccess(), log.Logger, goChan)
go func() {
for {
select {
case <- goChan:
taskBytes := rabbitmq.CreateNewTaskV2([]string{"test", "test2"}, "Status is OK")
err = clientV2.UnsafePush(taskBytes)
if err != nil {
echoServer.Logger.Fatal(err)
os.Exit(1)
}
return
default:
// Do other stuff
}
}
}()
EDIT: I updated my version and everything works
@ribice, Thank you very much for the example. I would like just to add a note about the following error which could occur after reconnecting to a RabbitMQ server:
panic: sync: negative WaitGroup counter
When the connection drops, c.wg.Done()
on L222 will decrease the counter in WaitGroup to 0. After reconnecting to the server, the counter remains 0 because there isn't a c.wg.Add()
call to increase it. When the connection drops again or you try to shut down the application, c.wg.Done()
is called again and panic occurs.
I have managed to fix the problem by deleting client.wg.Add(threads)
from L40 and putting c.wg.Add(1)
before calling the goroutine in Stream()
function.
Cheers 🍻
There's a duplicated UnsafePush() method https://gist.github.com/ribice/20951bd1c84d714ff2476465c0c0653f#file-rabbitmq-go-L156-L190
hai, where is this c.streamQueue
come from?
Thanks for reporting @sundowndev and @mshddev, I fixed both issues.
@dikkini: Thanks, addressed those.
What is recover()
at L259?
another thing is that can we use the raw type to communicate between the goroutine? I mean the alive
and isConnected
@ribice, Thank you very much for the example. I would like just to add a note about the following error which could occur after reconnecting to a RabbitMQ server:
panic: sync: negative WaitGroup counter
When the connection drops,
c.wg.Done()
on L222 will decrease the counter in WaitGroup to 0. After reconnecting to the server, the counter remains 0 because there isn't ac.wg.Add()
call to increase it. When the connection drops again or you try to shut down the application,c.wg.Done()
is called again and panic occurs.I have managed to fix the problem by deleting
client.wg.Add(threads)
from L40 and puttingc.wg.Add(1)
before calling the goroutine inStream()
function.Cheers 🍻
Awesome fix.
@ribice, Thank you very much for the example. I would like just to add a note about the following error which could occur after reconnecting to a RabbitMQ server:
panic: sync: negative WaitGroup counter
When the connection drops,
c.wg.Done()
on L222 will decrease the counter in WaitGroup to 0. After reconnecting to the server, the counter remains 0 because there isn't ac.wg.Add()
call to increase it. When the connection drops again or you try to shut down the application,c.wg.Done()
is called again and panic occurs.I have managed to fix the problem by deleting
client.wg.Add(threads)
from L40 and puttingc.wg.Add(1)
before calling the goroutine inStream()
function.Cheers 🍻
Thanks. I noticed that I did this change long ago on my environment, but forgot to update Gist. Will do that today.
I update Gist, hope that's correct now.
Hey, how should I use this library in an architecture that consists of 1 producer and 3 subscriber? Basically my producer microservice will publish a message and 3 of the receiver services will consume it. There will be only one queue and it is fanout type. How should I structure my code regarding this library? It looks a bit complicated to me.
Could you explain a bit more how does handleReconnect
function work? And why do we need both alive
and isConnected
booleans?
L181: I assume threads
is c.threads
?
Thanks for noticing, fixed this! Receiver in my actual code is named
q
and not c, thus this typo.