Skip to content

Instantly share code, notes, and snippets.

@FX-HAO
Last active July 23, 2020 03:59
Show Gist options
  • Save FX-HAO/6301c7fd7d7c92a83208b1c78ea6d82a to your computer and use it in GitHub Desktop.
Save FX-HAO/6301c7fd7d7c92a83208b1c78ea6d82a to your computer and use it in GitHub Desktop.
Exponential backoff for [sneakers](https://github.com/jondot/sneakers)
module Sneakers
module Handlers
class ExponentialBackoffHandler
def initialize(channel, queue, opts)
@channel = channel
@opts = opts
@worker_queue_name = queue.name
@max_retries = @opts[:retry_max_times] || 3
retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
@retry_exchange = @channel.exchange(
retry_name,
:type => 'x-delayed-message',
:durable => exchange_durable?,
arguments => { "x-delayed-type" => "direct" }
)
Sneakers.logger.debug { "#{log_prefix} creating exchange=#{retry_name}" }
queue.bind(@retry_exchange, :routing_key => '#')
end
def acknowledge(hdr, props, msg)
@channel.acknowledge(hdr.delivery_tag, false)
end
def reject(hdr, props, msg, requeue=false)
if requeue
# This was explicitly rejected specifying it be requeued so we DO
# want it to pass through our retry logic.
handle_retry(hdr, props, msg, :reject)
else
@channel.reject(hdr.delivery_tag, false)
end
end
def error(hdr, props, msg, err)
reject(hdr, props, msg)
end
def noop(hdr, props, msg)
end
private
def handle_retry(hdr, props, msg, reason)
num_attempts = failure_count(props[:headers]) + 1
if num_attempts <= @max_retries
Sneakers.logger.info do
"#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}"
end
@retry_exchange.publish(msg,
routing_key: hdr.routing_key,
headers: props[:headers].to_h.merge({x_retries: num_attempts, 'x-delay' => 2**(num_attempts-1) * 3000})
)
@channel.reject(hdr.delivery_tag, false)
# TODO: metrics
else
Sneakers.logger.info do
"#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}"
end
@channel.reject(hdr.delivery_tag, false)
end
end
def failure_count(headers)
(headers && headers['x_retries']).to_i
end
def log_prefix
"Maxretry handler [queue=#{@worker_queue_name}]"
end
def queue_durable?
@opts.fetch(:queue_options, {}).fetch(:durable, false)
end
def exchange_durable?
queue_durable?
end
end
end
end
@FX-HAO
Copy link
Author

FX-HAO commented Jul 23, 2020

# Register this into Sneaker
Sneakers.configure(handler: Sneakers::Handlers::ExponentialBackoffHandler)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment