Skip to content

Instantly share code, notes, and snippets.

@cybernetlab
Created January 18, 2017 15:30
Show Gist options
  • Save cybernetlab/4b187a8ae9f96c0a8766f86db9e2f53c to your computer and use it in GitHub Desktop.
Save cybernetlab/4b187a8ae9f96c0a8766f86db9e2f53c to your computer and use it in GitHub Desktop.
absinthe subscriptions
defmodule MyApp.GraphQLChannel do
use Phoenix.Channel
require Logger
alias MyApp.Schema
intercept ["subscription"]
def join("graphql", _params, socket) do
{:ok, assign(socket, :subscriptions, [])}
end
def handle_in("run", %{"body" => body} = request, socket) do
{:reply, Absinthe.run(body, Schema, gql_options(request)), socket}
end
def handle_in("subscribe", %{"body" => body, "id" => id} = request, socket) do
opts = gql_options(request, %{id: id})
subscribe_opts = put_in(opts, [:context, :subscribe], true)
case Absinthe.run(body, Schema, subscribe_opts) do
{:ok, {:subscribe, lookup}} ->
subscription = %{id: id, body: body, lookup: lookup, options: opts}
with {:ok, socket} <- add_subscription(subscription, socket) do
{:reply, :ok, socket}
else
error -> {:reply, error, socket}
end
error ->
{:reply, error, socket}
end
end
def handle_in("unsubscribe", %{"id" => id}, socket) do
with {:ok, socket} <- remove_subscription(id, socket) do
{:reply, :ok, socket}
else
error -> {:reply, error, socket}
end
end
def handle_out("subscription", msg, socket) do
socket = socket.assigns[:subscriptions]
|> Enum.reduce(socket, & update(&1, msg, &2))
{:noreply, socket}
end
defp gql_options(request, context \\ %{}) do
options = [variables: Map.get(request, "variables", %{}),
context: context]
case Map.get(request, "operation_name") do
nil -> options
op -> Keyword.put(options, :operation, op)
end
end
defp update(
%{lookup: {model, :any}} = subscription,
%{data: data, action: :update, model: data_model},
socket) when data_model == model
do
run_subscription(Map.values(data), subscription, socket)
end
defp update(
%{lookup: {model, data_id}} = subscription,
%{data: data, action: :update, model: data_model},
socket) when data_model == model
do
case Map.get(data, data_id) do
nil -> socket
data -> run_subscription(data, subscription, socket)
end
end
defp update(
%{lookup: {model, data_id}} = subscription,
%{data: ids, action: :delete, model: data_model},
socket) when data_model == model
do
ids = case data_id do
:any -> ids
id -> Enum.filter(ids, & &1 == id)
end
case ids do
[] -> nil
ids -> push(socket, "subscription", %{id: subscription.id, delete: ids})
end
socket
end
defp update(subscription, msg, socket) do
Logger.warn("GraphQL Channel: Unhandled update: #{inspect subscription} #{inspect msg}")
socket
end
defp run_subscription(data, subscription, socket) do
opts = put_in(subscription.options, [:context, :subscription], data)
with {:ok, data} <- Absinthe.run(subscription.body, Schema, opts) do
push(socket, "subscription", Map.put(data, :id, subscription.id))
else
error ->
Logger.warn("GraphQL Channel: Error while processing subscription:" <>
inspect(error))
end
socket
end
defp add_subscription(%{id: id} = subscription, socket) do
subscriptions = socket.assigns[:subscriptions]
case Enum.find(subscriptions, & &1.id == id) do
nil -> {:ok, assign(socket, :subscriptions, [subscription | subscriptions])}
_ -> {:error, "subscription with id #{id} already exists"}
end
end
defp remove_subscription(id, socket) do
subscriptions = socket.assigns[:subscriptions]
size = length(subscriptions)
subscriptions = Enum.reject(subscriptions, & &1.id == id)
case size == length(subscriptions) do
true -> {:error, "subscription with id #{id} not found"}
_ -> {:ok, assign(socket, :subscriptions, subscriptions)}
end
end
end
defmodule MyApp.Schema do
use Absinthe.Schema
alias MyApp.Model
alias MyApp.Subscription
alias Absinthe.Resolution.Plugin
import Subscription, only: [subscribe: 2]
def resolution_plugins() do
Plugin.defaults ++ [Subscription]
end
query do
field :models, list_of(:model) do
resolve fn _a, _ ->
models = Model
|> Mnesia.select
|> Enum.map(&Model.to_json/1)
{:ok, models}
end
end
end
subscription do
field :subscribe_models, type: list_of(:model) do
resolve fn _, context ->
subscribe(context, {Model, :any})
end
end
field :subscribe_model, type: :model do
arg :id, non_null(:id)
resolve fn %{id: id}, context ->
subscribe(context, {Model, id})
end
end
end
object :model do
field :id, :id
field :name, :string
end
end
defmodule MyApp.Subscription do
alias Absinthe.Blueprint
alias Absinthe.Resolution
alias Absinthe.Resolution.Plugin
@behaviour Plugin
defmodule FinishResolution do
def run(input, opts) do
case Keyword.get(opts, :context) do
%{subscribe: true} ->
{:replace, {:subscribe, Keyword.get(opts, :subscription)}, []}
_ ->
input
end
end
end
def subscribe(%Resolution{context: context}, term) do
{:plugin, __MODULE__, {context, term}}
end
def subscribe(context, term) when is_map(context) do
{:plugin, __MODULE__, {context, term}}
end
def before_resolution(acc) do
Map.put(acc, __MODULE__, nil)
end
def init({_, {_, _}} = sub, acc) do
{sub, Map.put(acc, __MODULE__, sub)}
end
def init(data, acc), do: {data, acc}
def after_resolution(acc) do
acc
end
def pipeline(pipeline, acc) do
case acc[__MODULE__] do
nil ->
pipeline
{%{subscribe: true} = context, subscription} ->
opts = [context: context, subscription: subscription]
[{FinishResolution, opts}]
{%{subscription: _} = context, subscription} ->
[Absinthe.Phase.Document.Execution.Resolution] ++ pipeline
end
end
def resolve({%{subscription: data}, _}, acc) do
{{:ok, data}, acc}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment