Skip to content

Instantly share code, notes, and snippets.

Created October 9, 2017 12:04
Show Gist options
  • Save mpskovvang/ab4c0532d63d4156cf18a2f01b448b25 to your computer and use it in GitHub Desktop.
Save mpskovvang/ab4c0532d63d4156cf18a2f01b448b25 to your computer and use it in GitHub Desktop.
At [Katoni]( we needed a way to handle failed messages and made this simple exponential backoff helper. Inspiration by [alphasights/sneakers_handlers](
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS);
$channel = $connection->channel();
$channel->exchange_declare('', 'direct', false, false, false);
$channel->exchange_declare('', 'direct', false, false, false, false, false);
$channel->queue_declare('example', false, true, false, false, false, [
'x-dead-letter-exchange' => ['S', ''],
'x-dead-letter-routing-key' => ['S', 'example']
$channel->queue_bind('example', '', 'example');
$channel->queue_declare('example.error', false, true, false, false, false);
$channel->queue_bind('example.error', '', 'example');
$helper = new ExponentialBackoffHelper($channel, 'example', '', 'example', [
'max_attempts' => 3
$channel->basic_qos(null, 1, null);
$channel->basic_consume('example', 'example', false, false, false, false, function ($message) use ($helper) {
echo $message->body;
while(count($channel->callbacks)) {
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
class ExponentialBackoffHelper
* @var AMQPChannel
private $channel;
* @var string
private $queue;
* @var string
private $exchange;
* @var string
private $routing_key;
* @var integer $max_delay The max time (in seconds) to wait before a retry.
private $max_delay = 60;
* @var integer $factor The base number for the exponential back off.
private $factor = 2;
* @var integer $max_attempts The max number of attempts allowed.
private $max_attempts = 10;
public function __construct(AMQPChannel $channel, $queue, $exchange, $routing_key, array $options = [])
$this->channel = $channel;
$this->queue = $queue;
$this->exchange = $exchange;
$this->routing_key = $routing_key;
if (isset($options['max_delay'])) {
if ($options['max_delay'] <= 0) {
throw new \Exception("Option 'max_delay' must be greater than 0.");
$this->max_delay = (int) $options['max_delay'];
if (isset($options['factor'])) {
if ($options['factor'] <= 0) {
throw new \Exception("Option 'factor' must be greater than 0.");
$this->factor = (int) $options['factor'];
if (isset($options['max_attempts'])) {
if ($options['max_attempts'] < 0) {
throw new \Exception("Option 'max_attempts' must not be negative.");
$this->max_attempts = (int) $options['max_attempts'];
public function acknowledge(AMQPMessage $message)
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag'], false);
public function reject(AMQPMessage $message, $requeue = false)
public function error(AMQPMessage $message)
public function timeout(AMQPMessage $message)
private function retry(AMQPMessage $message)
$attempts = $this->deaths($message);
if ($attempts < $this->max_attempts) {
$delay = $this->delay($attempts);
$routing_key = $this->queue . '.' . $delay;
$queue = $this->createRetryQueue($delay);
$this->channel->queue_bind($queue, $this->exchange, $routing_key);
$this->channel->basic_publish($message, $this->exchange, $routing_key);
} else {
$message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], false);
private function deaths(AMQPMessage $message)
$headers = $message->has('application_headers') ? $message->get('application_headers')->getNativeData() : null;
if (is_null($headers) || !isset($headers['x-death'])) {
return 0;
$count = 0;
foreach ($headers['x-death'] as $death) {
if (strpos($death['queue'], $this->queue) === 0) {
$count += $death['count'];
return $count;
private function createRetryQueue($delay)
$queue = $this->queue . '.retry.' . $delay;
$this->channel->queue_declare($queue, false, true, false, false, false, [
'x-dead-letter-exchange' => ['S', $this->exchange],
'x-dead-letter-routing-key' => ['S', $this->queue],
'x-message-ttl' => ['I', $delay * 1000],
'x-expires' => ['I', $delay * 1000 * 2]
return $queue;
public function delay($attempts)
return min($this->max_delay, ($attempts + 1) ** $this->factor);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment