Skip to content

Instantly share code, notes, and snippets.

@ulfurinn
Last active March 24, 2023 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ulfurinn/f7d3f5bf552fe6855ba10e1983743496 to your computer and use it in GitHub Desktop.
Save ulfurinn/f7d3f5bf552fe6855ba10e1983743496 to your computer and use it in GitHub Desktop.
Phoenix PubSub over AMQP

A simple PubSub adapter that sends messages between nodes over AMQP.

The app supervisor needs to be configured with:

{Phoenix.PubSub, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: amqp_connection_id]},
defmodule Phoenix.PubSub.AMQP do
use Supervisor
require Logger
@behaviour Phoenix.PubSub.Adapter
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl Supervisor
def init(opts) do
# [adapter_name: App.PubSub.Adapter, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: ...]]
children = [
{__MODULE__.Gateway, opts}
]
Supervisor.init(children, strategy: :one_for_one)
end
@impl Phoenix.PubSub.Adapter
defdelegate broadcast(adapter_name, topic, message, dispatcher), to: __MODULE__.Gateway
@impl Phoenix.PubSub.Adapter
defdelegate direct_broadcast(adapter_name, node_name, topic, message, dispatcher),
to: __MODULE__.Gateway
@impl true
def node_name(_adapter_name), do: node()
defmodule Gateway do
use GenServer
require Logger
defstruct [:connection, :ch, :name, :ident, :node_name]
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:adapter_name])
end
def broadcast(adapter_name, topic, message, dispatcher) do
GenServer.call(adapter_name, {:broadcast, topic, message, dispatcher})
end
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
if node_name == node() do
Phoenix.PubSub.local_broadcast(adapter_name, topic, message, dispatcher)
else
GenServer.call(adapter_name, {:direct_broadcast, node_name, topic, message, dispatcher})
end
end
def init(opts) do
state = %__MODULE__{
connection: opts[:amqp][:connection],
name: opts[:name],
ident: Atom.to_string(opts[:name]),
node_name: Atom.to_string(node())
}
{:ok, state, {:continue, :connect}}
end
def handle_continue(:connect, state) do
with {:ok, conn} <- AMQP.Application.get_connection(state.connection),
{:ok, ch} <- AMQP.Channel.open(conn) do
Process.monitor(ch.pid)
setup_channel_disposal(ch)
:ok = AMQP.Basic.qos(ch, prefetch_count: 25)
{:noreply, %__MODULE__{state | ch: ch}, {:continue, :setup_topology}}
else
_ ->
Process.send_after(self(), :try_connect, 100)
{:noreply, state}
end
end
def handle_continue(:setup_topology, state) do
:ok = AMQP.Exchange.fanout(state.ch, exchange(state), durable: true)
{:ok, %{queue: queue}} = AMQP.Queue.declare(state.ch, queue(state), durable: true)
:ok = AMQP.Queue.bind(state.ch, queue, exchange(state))
{:ok, _tag} = AMQP.Basic.consume(state.ch, queue, nil)
{:noreply, state}
end
def handle_call(
{:broadcast, _topic, _message, _dispatcher},
_from,
state = %__MODULE__{ch: nil}
) do
{:reply, {:error, :not_connected}, state}
end
def handle_call({:broadcast, topic, message, _dispatcher}, _from, state) do
:ok =
AMQP.Basic.publish(state.ch, exchange(state), "", :erlang.term_to_binary(message),
headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}]
)
{:reply, :ok, state}
end
def handle_call(
{:direct_broadcast, _node_name, _topic, _message, _dispatcher},
_from,
state = %__MODULE__{ch: nil}
) do
{:reply, {:error, :not_connected}, state}
end
def handle_call({:direct_broadcast, node_name, topic, message, _dispatcher}, _from, state) do
:ok =
AMQP.Basic.publish(state.ch, "", queue(state, node_name), :erlang.term_to_binary(message),
headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}]
)
{:reply, :ok, state}
end
def handle_info(:try_connect, state) do
{:noreply, state, {:continue, :connect}}
end
def handle_info({:basic_consume_ok, %{consumer_tag: _}}, state) do
{:noreply, state}
end
def handle_info(
{:basic_deliver, payload, %{delivery_tag: dtag, headers: headers}},
state
) do
payload = :erlang.binary_to_term(payload)
topic = headers |> header("topic")
sender = headers |> header("sender")
if sender != Atom.to_string(node()) do
Phoenix.PubSub.local_broadcast(state.name, topic, payload)
end
AMQP.Basic.ack(state.ch, dtag)
{:noreply, state}
end
def handle_info(
{:DOWN, _ref, :process, pid, _reason},
state = %__MODULE__{ch: %AMQP.Channel{pid: pid}}
) do
{:noreply, %__MODULE__{state | ch: nil}, {:continue, :connect}}
end
def handle_info(msg, state) do
Logger.error(
"#{__MODULE__} #{Process.info(self())[:registered_name]} received unexpected message in handle_info/2: #{inspect(msg)}"
)
{:noreply, state}
end
def terminate(reason, state) do
Logger.info("exiting: #{inspect(reason)}")
{:noreply, state}
end
defp exchange(%__MODULE__{ident: ident}), do: ident
defp queue(state), do: queue(state, node())
defp queue(%__MODULE__{ident: ident}, n), do: "#{ident}.#{n}"
defp header(headers, name) do
case List.keyfind(headers, name, 0) do
{^name, _, value} -> value
nil -> nil
end
end
defp setup_channel_disposal(ch = %AMQP.Channel{pid: ch_pid}) do
gateway = self()
monitoring = make_ref()
spawn(fn ->
Process.monitor(gateway)
Process.monitor(ch_pid)
send(gateway, monitoring)
receive do
{:DOWN, _, _, ^gateway, _} ->
AMQP.Channel.close(ch)
nil
{:DOWN, _, _, ^ch_pid, _} ->
nil
end
end)
receive do
^monitoring -> :ok
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment