Skip to content

Instantly share code, notes, and snippets.

@mpoeter
Created December 5, 2018 10:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mpoeter/d18885caa3b46d47f031f6d913c93208 to your computer and use it in GitHub Desktop.
Save mpoeter/d18885caa3b46d47f031f6d913c93208 to your computer and use it in GitHub Desktop.
Async WebSocket queries
defmodule Absinthe.Phoenix.AsyncChannel do
use Phoenix.Channel
require Logger
@doc false
def join("__absinthe__:control", _, socket) do
schema = socket.assigns[:__absinthe_schema__]
absinthe_config = Map.get(socket.assigns, :absinthe, %{})
opts =
absinthe_config
|> Map.get(:opts, [])
|> Keyword.update(:context, %{pubsub: socket.endpoint}, fn context ->
Map.put(context, :pubsub, socket.endpoint)
end)
absinthe_config =
put_in(absinthe_config[:opts], opts)
|> Map.update(:schema, schema, &(&1))
|> Map.put(:tasks, %{})
socket = socket |> assign(:absinthe, absinthe_config)
{:ok, socket}
end
@doc false
def handle_in("doc", payload, socket) do
config = socket.assigns[:absinthe]
opts = config.opts |> Keyword.put(:variables, Map.get(payload, "variables", %{}))
query = Map.get(payload, "query", "")
Absinthe.Logger.log_run(:debug, {
query,
config.schema,
[],
opts,
})
case Absinthe.Phase.Parse.run(query, []) do
{:ok, parsed_query} -> run_query(parsed_query, config.schema, socket, opts)
{:error, _} = err -> {:reply, err, socket}
end
|> case do
{:reply, reply, _} = result ->
log_reply(reply)
result
result -> result
end
end
def handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket) do
Phoenix.PubSub.unsubscribe(socket.pubsub_server, doc_id)
Absinthe.Subscription.unsubscribe(socket.endpoint, doc_id)
{:reply, {:ok, %{subscriptionId: doc_id}}, socket}
end
def handle_info({:async_finish, result, socket_ref}, socket) do
{result, socket} = translate_result(result, socket)
log_reply(result)
reply(socket_ref, result)
{:noreply, socket}
end
def handle_info({:DOWN, task_ref, _, _, :normal}, socket) do
# the task exited normally -> simply unregister it; the reply
# is sent as part of the async_finish message handler
socket = unregister_task(socket, task_ref)
{:noreply, socket}
end
def handle_info({:DOWN, task_ref, _, _, _reason}, socket) do
# the task died for some reason -> send an error reply
# TODO - log error
socket_ref = Map.fetch!(socket.assigns.absinthe.tasks, task_ref)
socket = unregister_task(socket, task_ref)
reply(socket_ref, :error)
{:noreply, socket}
end
defp run_query(parsed_query, schema, socket, opts) do
pipeline = skip_parse_pipeline(schema, opts)
socket_ref = Phoenix.Channel.socket_ref(socket)
parent = self()
{_pid, task_ref} = spawn_monitor(fn ->
result = exec_query(parsed_query, pipeline)
send(parent, {:async_finish, result, socket_ref})
end)
contains_mutation = Enum.any?(parsed_query.input.definitions, &is_mutation?/1)
if contains_mutation do
# mutations are executed sequentially, so we wait for the task to finish.
wait_for_task(task_ref, socket_ref, socket)
else
socket = register_task(socket, task_ref, socket_ref)
{:noreply, socket}
end
end
defp is_mutation?(%Absinthe.Language.OperationDefinition{operation: op}), do: op == :mutation
defp is_mutation?(_), do: false
defp wait_for_task(task_ref, socket_ref, socket) do
receive do
{:DOWN, ^task_ref, _, _, :normal} ->
receive do
{:async_finish, result, ^socket_ref} ->
{result, socket} = translate_result(result, socket)
{:reply, result, socket}
after
0 ->
# the task has exited normally but we have not received a finish message
# -> this should not be possible!
# TODO - log error
{:reply, :error, socket}
end
{:DOWN, ^task_ref, _, _, _reason} ->
# the task died for some reason
# TODO - log error
{:reply, :error, socket}
end
end
defp translate_result(result, socket) do
case result do
{:ok, %{"subscribed" => topic}, context} ->
:ok = Phoenix.PubSub.subscribe(socket.pubsub_server, topic, [
fastlane: {socket.transport_pid, socket.serializer, []},
link: true,
])
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:ok, %{subscriptionId: topic}}, socket}
{:ok, %{data: _} = reply, context} ->
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:ok, reply}, socket}
{:ok, %{errors: _} = reply, context} ->
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:error, reply}, socket}
{:error, reply} ->
{reply, socket}
end
end
defp exec_query(doc, pipeline) do
case Absinthe.Pipeline.run(doc, pipeline) do
{:ok, %{result: result, execution: res}, _phases} ->
{:ok, result, res.context}
{:error, msg, _phases} ->
{:error, msg}
end
end
defp log_reply(reply) do
Logger.debug(fn ->
"""
-- Absinthe Phoenix Reply --
#{inspect reply}
----------------------------
"""
end)
end
defp skip_parse_pipeline(schema, options) do
[{Absinthe.Phase.Parse, _} | rest] = Absinthe.Pipeline.for_document(schema, options)
rest
end
defp register_task(socket, task_ref, socket_ref) do
absinthe_assigns =
socket.assigns
|> Map.get(:absinthe, %{})
|> Map.update!(:tasks, &Map.put(&1, task_ref, socket_ref))
Phoenix.Socket.assign(socket, :absinthe, absinthe_assigns)
end
defp unregister_task(socket, task_ref) do
absinthe_assigns =
socket.assigns
|> Map.get(:absinthe, %{})
|> Map.update!(:tasks, &Map.delete(&1, task_ref))
Phoenix.Socket.assign(socket, :absinthe, absinthe_assigns)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment