Skip to content

Instantly share code, notes, and snippets.

@ono
Created January 17, 2017 12:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ono/107ea45734e3bbcba19a03c1466f7253 to your computer and use it in GitHub Desktop.
Save ono/107ea45734e3bbcba19a03c1466f7253 to your computer and use it in GitHub Desktop.
defmodule Sample.DlxTest do
@moduledoc """
DlxTest shows a sample code of how to work with Dead Exchange Letter on RabbitMQ.
Dead Letter Exchange will let you control retry logic.
## Requirement
- [AMQP](https://hex.pm/packages/amqp)
## How to run
iex> import Sample.DlxTest
iex> reset_queue() # set up queue
iex> consumer = start_consumer() # start consumer
iex> send_message("Hello") # Normal message
iex> send_message("Please fail") # Simulate error
iex> send(consumer, :close) # Stop consumer
"""
@queue_name "dlx"
def create_queue(queue_name \\ @queue_name) do
{:ok, connection} = AMQP.Connection.open []
{:ok, channel} = AMQP.Channel.open(connection)
retry_queue = queue_name <> ".retry"
main_options = [
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, retry_queue}
],
durable: true
]
work = AMQP.Queue.declare(channel, queue_name, main_options)
retry_options = [
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, queue_name},
{"x-message-ttl", :long, 10000}
],
durable: true
]
retry = AMQP.Queue.declare(channel, retry_queue, retry_options)
{work, retry}
end
def delete_queue(queue_name \\ @queue_name) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.delete(channel, queue_name)
AMQP.Queue.delete(channel, queue_name <> ".retry")
AMQP.Connection.close(connection)
end
def reset_queue(queue_name \\ @queue_name) do
delete_queue(queue_name)
create_queue(queue_name)
end
def start_consumer(queue_name \\ @queue_name) do
spawn fn ->
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
:ok = AMQP.Basic.qos(channel, prefetch_count: 1)
{:ok, consumer_tag} = AMQP.Basic.consume(channel, queue_name)
run_consumer({connection, channel, consumer_tag})
end
end
def send_message(queue_name \\ @queue_name, message) do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
publish_state = AMQP.Basic.publish(channel, "", queue_name, message, persistent: true)
AMQP.Connection.close(connection)
publish_state
end
def run_consumer({connection, channel, consumer_tag}) do
alive =
receive do
{:basic_deliver, message, meta} ->
IO.puts "Received from RabbitMQ: #{message}"
IO.puts "Meta: #{inspect meta}"
%{delivery_tag: tag, redelivered: redelivered} = meta
if message =~ ~r/fail/ do
if retry_count(meta) >= 10 do
IO.puts "Failed more than 10 times already. Stop retrying!"
AMQP.Basic.ack(channel, tag)
else
IO.puts "Received failed message! It will be retried"
# You have to set false to requeue option to be dead lettered
AMQP.Basic.nack(channel, tag, requeue: false)
end
else
AMQP.Basic.ack(channel, tag)
end
true
:close ->
IO.puts "Recieved: closed"
AMQP.Connection.close(connection)
false
message -> nil
IO.puts "Recieved: #{inspect message}"
true
end
if alive, do: run_consumer({connection, channel, consumer_tag})
end
def retry_count(%{headers: :undefined}), do: 0
def retry_count(%{headers: headers}) do
x_death = Enum.find headers, fn ({key, _, _}) ->
key == "x-death"
end
retry_count(x_death)
end
def retry_count({"x-death", :array, tables}) do
tables
|> Enum.map(fn({_, attributes}) ->
count_attr = Enum.find attributes, fn ({key, _, _}) ->
key == "count"
end
if count_attr do
{_, _, count} = count_attr
count
else
0
end
end)
|> Enum.max(fn -> 0 end)
end
def retry_count(_), do: 0
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment