Created
January 17, 2017 12:33
-
-
Save ono/107ea45734e3bbcba19a03c1466f7253 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Sample.DlxTest do | |
@moduledoc """ | |
DlxTest shows a sample code of how to work with Dead Exchange Letter on RabbitMQ. | |
Dead Letter Exchange will let you control retry logic. | |
## Requirement | |
- [AMQP](https://hex.pm/packages/amqp) | |
## How to run | |
iex> import Sample.DlxTest | |
iex> reset_queue() # set up queue | |
iex> consumer = start_consumer() # start consumer | |
iex> send_message("Hello") # Normal message | |
iex> send_message("Please fail") # Simulate error | |
iex> send(consumer, :close) # Stop consumer | |
""" | |
@queue_name "dlx" | |
def create_queue(queue_name \\ @queue_name) do | |
{:ok, connection} = AMQP.Connection.open [] | |
{:ok, channel} = AMQP.Channel.open(connection) | |
retry_queue = queue_name <> ".retry" | |
main_options = [ | |
arguments: [ | |
{"x-dead-letter-exchange", :longstr, ""}, | |
{"x-dead-letter-routing-key", :longstr, retry_queue} | |
], | |
durable: true | |
] | |
work = AMQP.Queue.declare(channel, queue_name, main_options) | |
retry_options = [ | |
arguments: [ | |
{"x-dead-letter-exchange", :longstr, ""}, | |
{"x-dead-letter-routing-key", :longstr, queue_name}, | |
{"x-message-ttl", :long, 10000} | |
], | |
durable: true | |
] | |
retry = AMQP.Queue.declare(channel, retry_queue, retry_options) | |
{work, retry} | |
end | |
def delete_queue(queue_name \\ @queue_name) do | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
AMQP.Queue.delete(channel, queue_name) | |
AMQP.Queue.delete(channel, queue_name <> ".retry") | |
AMQP.Connection.close(connection) | |
end | |
def reset_queue(queue_name \\ @queue_name) do | |
delete_queue(queue_name) | |
create_queue(queue_name) | |
end | |
def start_consumer(queue_name \\ @queue_name) do | |
spawn fn -> | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
:ok = AMQP.Basic.qos(channel, prefetch_count: 1) | |
{:ok, consumer_tag} = AMQP.Basic.consume(channel, queue_name) | |
run_consumer({connection, channel, consumer_tag}) | |
end | |
end | |
def send_message(queue_name \\ @queue_name, message) do | |
{:ok, connection} = AMQP.Connection.open | |
{:ok, channel} = AMQP.Channel.open(connection) | |
publish_state = AMQP.Basic.publish(channel, "", queue_name, message, persistent: true) | |
AMQP.Connection.close(connection) | |
publish_state | |
end | |
def run_consumer({connection, channel, consumer_tag}) do | |
alive = | |
receive do | |
{:basic_deliver, message, meta} -> | |
IO.puts "Received from RabbitMQ: #{message}" | |
IO.puts "Meta: #{inspect meta}" | |
%{delivery_tag: tag, redelivered: redelivered} = meta | |
if message =~ ~r/fail/ do | |
if retry_count(meta) >= 10 do | |
IO.puts "Failed more than 10 times already. Stop retrying!" | |
AMQP.Basic.ack(channel, tag) | |
else | |
IO.puts "Received failed message! It will be retried" | |
# You have to set false to requeue option to be dead lettered | |
AMQP.Basic.nack(channel, tag, requeue: false) | |
end | |
else | |
AMQP.Basic.ack(channel, tag) | |
end | |
true | |
:close -> | |
IO.puts "Recieved: closed" | |
AMQP.Connection.close(connection) | |
false | |
message -> nil | |
IO.puts "Recieved: #{inspect message}" | |
true | |
end | |
if alive, do: run_consumer({connection, channel, consumer_tag}) | |
end | |
def retry_count(%{headers: :undefined}), do: 0 | |
def retry_count(%{headers: headers}) do | |
x_death = Enum.find headers, fn ({key, _, _}) -> | |
key == "x-death" | |
end | |
retry_count(x_death) | |
end | |
def retry_count({"x-death", :array, tables}) do | |
tables | |
|> Enum.map(fn({_, attributes}) -> | |
count_attr = Enum.find attributes, fn ({key, _, _}) -> | |
key == "count" | |
end | |
if count_attr do | |
{_, _, count} = count_attr | |
count | |
else | |
0 | |
end | |
end) | |
|> Enum.max(fn -> 0 end) | |
end | |
def retry_count(_), do: 0 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment