Skip to content

Instantly share code, notes, and snippets.

@andreib1
Last active April 1, 2021 21:53
Show Gist options
  • Save andreib1/a136ce1c951e3281a1990c23871d948b to your computer and use it in GitHub Desktop.
Save andreib1/a136ce1c951e3281a1990c23871d948b to your computer and use it in GitHub Desktop.
Prevent a nats message from being redelivered during processing
// This function can be called to start a hold on a message for processing on Jetstream.
// The interval should be shorter than the message ack timeout.
// The parentCancel can be used to provide a cancellation function for processing in the event that we cannot set InProgress
// The function return should be named release by the caller, and can be used to unlock message processing after a successful ACK/NAK
func HoldMessage(ctx context.Context, msg *nats.Msg, interval time.Duration, parentCancel context.CancelFunc) context.CancelFunc {
ctx, release := context.WithCancel(ctx)
if interval < 1 {
return release
}
var exit bool
go func() {
for {
select {
case <- time.After(interval):
if err := msg.InProgress() ; err != nil {
release()
if parentCancel != nil {
parentCancel()
exit = true
}
}
case <-ctx.Done():
exit = true
}
if exit {
break
}
}
release()
}()
return release
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment