Skip to content

Instantly share code, notes, and snippets.

@binaryseed
Last active August 15, 2021 19:56
Show Gist options
  • Save binaryseed/10e937f2dd7c9686d3557e57c00d4033 to your computer and use it in GitHub Desktop.
Save binaryseed/10e937f2dd7c9686d3557e57c00d4033 to your computer and use it in GitHub Desktop.
GraphQl-Subscriptions-Absinthe-SSE

It's possible to do GraphQl Subscriptions over basic HTTP 1.1 with Server Sent Events!!

Start the needed Supervisors...

supervisor(Streamer.Supervisor, [[pubsub: MyPubSub]])

Mount the Plug...

match "/stream/graphql", to: Streamer, init_opts: [schema: MySchema, pubsub: MyPubSub]

The plug code would be the thing that needs to get integrated into Absinthe Plug.

Rough Implementation...

defmodule Streamer.Supervisor do
  use Supervisor

  def start_link(pubsub: pubsub) do
    Supervisor.start_link(__MODULE__, [pubsub: pubsub])
  end

  def init(pubsub: pubsub) do
    children = [
      worker(pubsub, []),
      supervisor(Absinthe.Subscription, [pubsub])
    ]
    supervise(children, strategy: :one_for_one)
  end
end

defmodule Streamer do
  @behaviour Plug

  def init(opts),
    do: opts

  def call(conn, [schema: schema, pubsub: pubsub]) do
    # Just want another process to handle the subscription, not necessarily the perfect implementation
    Process.flag(:trap_exit, true)
    {:ok, runner} = Streamer.Runner.start_link(schema, pubsub, conn)
    receive do
      {:EXIT, ^runner, _reason} ->
        Plug.Conn.chunk(conn, "BYE\n\n")
    end
    conn
  end
end

defmodule Streamer.Runner do
  use GenServer

  def start_link(schema, pubsub, conn) do
    GenServer.start_link(__MODULE__, [schema: schema, pubsub: pubsub, conn: conn], name: __MODULE__)
  end

  def init(schema: schema, pubsub: pubsub, conn: conn) do
    context = Map.merge(conn.private.absinthe.context, %{pubsub: pubsub})
    {:ok, %{"subscribed" => topic}} = Absinthe.run(conn.params["query"], schema, context: context)

    pubsub.subscribe(topic)

    conn =
      conn
      |> Plug.Conn.put_resp_header("content-type", "text/event-stream")
      |> Plug.Conn.send_chunked(200)

    {:ok, %{conn: conn, topic: topic}}
  end

  def handle_info({:broadcast, msg}, %{conn: conn} = state) do
    Plug.Conn.chunk(conn, "data: #{Poison.encode!(msg.result.data)}\n\n")
    {:noreply, state}
  end
end

And a dummy local PubSub example...

defmodule MyPubSub do
  @behaviour Absinthe.Subscription.Pubsub

  def start_link() do
    Registry.start_link(:unique, __MODULE__)
  end

  def subscribe(topic) do
    Registry.register(__MODULE__, topic, [])
    :ok
  end

  def publish_subscription(topic, data) do
    message = %{
      topic: topic,
      event: "subscription:data",
      result: data,
    }

    Registry.dispatch(__MODULE__, topic, fn entries ->
      for {pid, _} <- entries, do: send(pid, {:broadcast, message})
    end)
  end

  def publish_mutation(proxy_topic, mutation_result, subscribed_fields) do
    # this pubsub is local and doesn't support clusters
    :ok
  end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment