Skip to content

Instantly share code, notes, and snippets.

@benwilson512
Created November 15, 2021 14:16
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save benwilson512/104fe1d4b35c11a259dd788ad00e011b to your computer and use it in GitHub Desktop.
Save benwilson512/104fe1d4b35c11a259dd788ad00e011b to your computer and use it in GitHub Desktop.
# TODO: Extract into it's own library
defmodule AbsintheSocket do
require Logger
alias Phoenix.Channels.GenSocketClient
@behaviour GenSocketClient
@control_topic "__absinthe__:control"
defdelegate fetch(term, key), to: Map
defdelegate get(term, key, default), to: Map
defdelegate get_and_update(term, key, fun), to: Map
defdelegate pop(term, key), to: Map
defstruct pids: %{},
channel_connected: false,
active_subscriptions: %{},
inflight: %{},
pending: []
def run(socket, query, opts) do
payload =
opts
|> Map.new()
|> Map.put(:query, query)
GenSocketClient.call(socket, {:run, payload})
end
def clear_subscriptions(socket) do
GenSocketClient.call(socket, :clear_subscriptions)
end
def start_link(opts) do
GenSocketClient.start_link(
__MODULE__,
GenSocketClient.Transport.WebSocketClient,
opts
)
end
def init(opts) do
state = %__MODULE__{}
{:connect, opts[:url], Enum.to_list(opts[:params]), state}
end
def handle_call({:run, payload}, {pid, _} = from, transport, state) do
state =
state
|> Map.update!(:pending, &[%{reply_to: from, pid: pid, payload: payload} | &1])
|> push_messages(transport)
{:noreply, state}
end
def handle_call(:clear_subscriptions, {pid, _}, transport, state) do
{sub_ids, pids} = Map.pop(state.pids, pid)
sub_ids = sub_ids || []
Enum.each(sub_ids, fn sub_id ->
Logger.debug(fn ->
"Unsubscribing from #{sub_id}"
end)
GenSocketClient.push(transport, @control_topic, "unsubscribe", %{"subscriptionId" => sub_id})
end)
active_subs = Map.drop(state.active_subscriptions, sub_ids)
state = %{
state
| active_subscriptions: active_subs,
pids: pids
}
{:reply, :ok, state}
end
def handle_connected(transport, state) do
GenSocketClient.join(transport, @control_topic)
{:ok, state}
end
def handle_disconnected(reason, state) do
Logger.warn("disconnected: #{inspect(reason)}")
Process.send_after(self(), :connect, :timer.seconds(1))
state =
state
|> Map.put(:channel_connected, false)
|> enqueue_subscriptions
{:ok, state}
end
def handle_joined(@control_topic, _payload, transport, state) do
state =
state
|> Map.put(:channel_connected, true)
|> push_messages(transport)
{:ok, state}
end
def handle_join_error(@control_topic, :already_joined, _, state) do
{:ok, state}
end
def handle_join_error(topic, payload, _transport, state) do
Logger.error("join error on the topic #{topic}: #{inspect(payload)}")
{:ok, state}
end
def handle_channel_closed(topic, payload, _transport, state) do
Logger.error("disconnected from the topic #{topic}: #{inspect(payload)}")
Process.send_after(self(), {:join, topic}, :timer.seconds(1))
{:ok, state}
end
def handle_message(topic, "subscription:data", %{"result" => result}, _transport, state) do
case Map.fetch(state.active_subscriptions, topic) do
{:ok, %{pid: pid}} ->
message = %__MODULE__.Message{
id: topic,
payload: result,
type: :subscription_data
}
send(pid, message)
_ ->
Logger.error("Subscription data for unmatched topic #{topic}")
end
{:ok, state}
end
def handle_reply(
@control_topic,
ref,
%{"response" => %{"subscriptionId" => sub_id}, "status" => "ok"},
_transport,
state
) do
state =
case pop_in(state, [:inflight, ref]) do
{%{reply_to: from, pid: pid, payload: payload}, state} ->
active_subscriptions =
Map.put(state.active_subscriptions, sub_id, %{pid: pid, payload: payload})
pids = Map.update(state.pids, pid, [sub_id], &[sub_id | &1])
if from do
GenSocketClient.reply(from, {:ok, sub_id})
end
%{
state
| active_subscriptions: active_subscriptions,
pids: pids
}
{_, state} ->
state
end
{:ok, state}
end
def handle_info(:connect, _transport, state) do
Logger.info("attempting to reconnect")
{:connect, state}
end
def handle_info({:join, topic}, transport, state) do
Logger.debug("joining the topic #{topic}")
case GenSocketClient.join(transport, topic) do
{:error, :already_joined} ->
:ok
{:error, reason} ->
Logger.error("error joining the topic #{topic}: #{inspect(reason)}")
Process.send_after(self(), {:join, topic}, :timer.seconds(1))
{:ok, _ref} ->
:ok
end
{:ok, state}
end
def handle_info(message, _transport, state) do
Logger.warn("#{__MODULE__} Unhandled message #{inspect(message)}")
{:ok, state}
end
def push_messages(%{channel_connected: true} = state, transport) do
Logger.debug(fn ->
"Pushing #{length(state.pending)} messages"
end)
inflight =
Enum.reduce(
state.pending,
state.inflight,
fn %{payload: payload} = op, inflight ->
Logger.debug(fn ->
"""
#{__MODULE__} socket #{inspect(self())} pushing #{inspect(payload)}
"""
end)
{:ok, ref} = GenSocketClient.push(transport, @control_topic, "doc", payload)
Map.put(inflight, ref, op)
end
)
%{state | inflight: inflight, pending: []}
end
def push_messages(state, _) do
state
end
def enqueue_subscriptions(state) do
pending =
Enum.reduce(
state.active_subscriptions,
state.pending,
fn {_, %{payload: payload, pid: pid}}, pending ->
[
%{
payload: payload,
pid: pid,
reply_to: nil
}
| pending
]
end
)
%{
state
| active_subscriptions: %{},
pending: pending
}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment