Skip to content

Instantly share code, notes, and snippets.

@merqlove
Last active July 12, 2017 23:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save merqlove/625040c543b7badcedbea4e4a4c9e55d to your computer and use it in GitHub Desktop.
Save merqlove/625040c543b7badcedbea4e4a4c9e55d to your computer and use it in GitHub Desktop.
Elixir AMQP resilent publisher
defmodule Publisher do
use GenServer
use AMQP
require Logger
@amqp [host: "localhost", port: "5682"]
@reconnect_timeout 5000
def start_link() do
GenServer.start_link(__MODULE__, [], name: :publisher_server)
end
def init(_) do
state = %{connected: false, channel: nil, queue_name: "task_queue", conn: nil)}
{:ok, rabbitmq_connect(state)}
end
@doc """
lookup interface
"""
def lookup, do: GenServer.call(:publisher_server, {:lookup})
@doc """
publish interface
"""
def publish(message) when is_bitstring(message) do
GenServer.call(:publisher_server, {:publish, message})
end
def publish(_), do: :error
def handle_call({:lookup}, _, state), do: {:reply, state, state}
def handle_call({:publish, message}, _, %{channel: chan, connected: true, queue_name: queue_name} = state) do
result = Basic.publish(chan, "", queue_name, message, persistent: true)
{:reply, result, state}
end
@doc """
handles process EXIT
"""
def handle_info({:DOWN, _, :process, _pid, _reason}, state) do
new_state = rabbitmq_connect(state)
{:noreply, state}
end
@doc """
handles rabbit reconnect
"""
def handle_info(:rabbitmq_connect, state) do
new_state = rabbitmq_connect(state)
{:noreply, state}
end
@doc """
handles other messages
"""
def handle_info(msg, state) do
Logger.warn("Unknown msg received #{inspect(msg)}")
{:noreply, state}
end
@doc """
handles server down
"""
def terminate(_reason, %{conn: conn}) do
Connection.close(conn)
:ok
end
defp rabbitmq_connect(%{queue_name: queue_name} = state) do
default_opts = [host: nil, port: nil]
opts =
@amqp
|> Enum.filter(fn({_, v}) -> v != "" && v != nil end)
case Connection.open(Keyword.merge(default_opts, opts)) do
{:ok, conn} ->
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
Queue.declare(chan, queue_name, durable: true)
%{ state | channel: chan, connected: true, conn: conn }
{:error, _} ->
# Reconnection loop
reconnect(state)
%{state | channel: nil, connected: false, conn: nil }
end
end
defp reconnect(state) do
Process.send_after(self(), :rabbitmq_connect, @reconnect_timeout)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment