Skip to content

Instantly share code, notes, and snippets.

@keathley
Created May 22, 2019 15:39
Show Gist options
  • Save keathley/68001608877623a4f48b8138e3f353f1 to your computer and use it in GitHub Desktop.
Save keathley/68001608877623a4f48b8138e3f353f1 to your computer and use it in GitHub Desktop.
Rabbit connection statemachine
defmodule RabbitConnection do
@moduledoc """
RabbitMQ client connection
"""
@behaviour :gen_statem
use AMQP
alias AMQP.Basic
@exchange "messages_exchange"
@queue "custom_alert_queue"
@queue_error "#{@queue}_error"
@retry_time 1_000
require Logger
def queue, do: @queue
def callback_mode do
[:state_functions, :state_enter]
end
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
def start_link({conn_opts, opts}) do
config =
config()
|> Keyword.merge(conn_opts)
case Keyword.pop(opts, :name) do
{nil, opts} ->
:gen_statem.start_link(__MODULE__, config, opts)
{name, opts} ->
:gen_statem.start_link({:local, name}, __MODULE__, config, opts)
end
end
def send_notification(server, notification) do
:gen_statem.call(server, {:publish, Jason.encode!(notification)})
end
def status(server) do
:gen_statem.call(server, :status)
end
def init(conn_opts) do
data = %{
channel: nil,
config: conn_opts,
error_count: 0
}
{:ok, :disconnected, data}
end
def disconnected(:enter, _, data) do
Logger.error("Entering disconnected state")
actions = [{:state_timeout, backoff(data.error_count), :connect}]
data = %{data | error_count: data.error_count + 1}
{:keep_state, data, actions}
end
def disconnected({:call, from}, {:publish, _}, _data) do
{:keep_state_and_data, [{:reply, from, {:error, :disconnected}}]}
end
def disconnected({:call, from}, :status, _data) do
{:keep_state_and_data, [{:reply, from, :disconnected}]}
end
def disconnected(:state_timeout, :connect, data) do
Logger.info("State timeout, #{inspect(data)}")
case Connection.open(data.config) do
{:ok, conn} ->
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
setup_queues(chan)
{:next_state, :connected, %{data | channel: chan}}
{:error, _} ->
Statix.increment("rabbitmq.connection_failure")
Logger.error("Could not connect to rabbitmq")
{:repeat_state, data}
end
end
def connected(:enter, _, data) do
Statix.increment("rabbitmq.connected")
{:keep_state, %{data | error_count: 0}}
end
def connected({:call, from}, {:publish, notification}, data) do
res = Basic.publish(data.channel, @exchange, @queue, notification)
{:keep_state_and_data, [{:reply, from, res}]}
end
def connected({:call, from}, :status, _data) do
{:keep_state_and_data, [{:reply, from, :connected}]}
end
def connected(:info, {:DOWN, _, :process, _pid, _reason}, data) do
Statix.increment("rabbitmq.disconnect")
{:next_state, :disconnected, data}
end
defp setup_queues(chan) do
{:ok, _} = Queue.declare(chan, @queue_error, durable: true)
{:ok, _} =
Queue.declare(chan, @queue,
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, @queue_error}
]
)
Exchange.declare(chan, @exchange, :direct, durable: true)
Queue.bind(chan, @queue, @exchange, routing_key: @queue)
end
defp backoff(0), do: 0
defp backoff(count) do
factor = :rand.uniform(trunc(:math.pow(2, count)) - 1)
factor * @retry_time
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment