Skip to content

Instantly share code, notes, and snippets.

@athal7
Last active July 13, 2018 13:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save athal7/27ce6dba44b092710b968a1ff1ba45f7 to your computer and use it in GitHub Desktop.
Save athal7/27ce6dba44b092710b968a1ff1ba45f7 to your computer and use it in GitHub Desktop.
Absinthe subscription replay functionality
defmodule Web.AbsintheChannel do
@moduledoc """
A custom channel that uses most of what Absinthe's
channel has to offer, but with additional handling
for 'replay'.
"""
use Phoenix.Channel
alias Web.Subscription.Message
def handle_in("replay", %{"replayId" => replay_id, "subscriptionId" => doc_id}, socket) do
replay_id
|> Message.since(doc_id_topics(doc_id, socket))
|> Enum.each(&Message.push(&1, docs(socket, &1.topics, doc_id), socket))
{:noreply, socket}
end
# Delegated to absinthe in order to handle our custom `replay` event.
# Ideally Absinthe will be improved to support either
# custom event handling, or replay functionality on it's own
# at some point.
defdelegate handle_in(event, payload, socket), to: Absinthe.Phoenix.Channel
defdelegate join(channel, message, socket), to: Absinthe.Phoenix.Channel
defp doc_id_topics(doc_id, socket) do
socket.endpoint
|> Absinthe.Subscription.registry_name()
|> Registry.lookup({self(), doc_id})
|> Enum.map(fn {_self, field_key} -> field_key end)
end
defp docs(socket, topics, doc_id) do
topics
|> Enum.flat_map(fn topic ->
socket.endpoint
|> Absinthe.Subscription.get(topic)
|> Enum.filter(fn {id, _doc} -> id == doc_id end)
|> Enum.map(fn {_id, doc} -> doc end)
end)
end
end
defmodule Web.Subscription.Cleanup do
@moduledoc """
A process to periodically cleanup subscription
messages that are stored for replay functionality.
"""
use Task
import Logger
@interval 1000 * 60
def start_link(_arg) do
Task.start_link(&poll_cleanup/0)
end
def poll_cleanup() do
receive do
after
@interval ->
count = Web.Subscription.Message.cleanup()
Logger.info("Cleaned up expired subscription messages", count: count)
poll_cleanup()
end
end
end
defmodule Db.EtsTable do
@moduledoc """
Helper functionality for easily interacting with
Erlang Term Storage in ways we commonly use.
"""
defmacro __using__(table: table, type: type) do
quote do
def ets_lookup(key) do
try_create_ets_table()
:ets.lookup(unquote(table), key)
end
def ets_insert(record) do
try_create_ets_table()
:ets.insert_new(unquote(table), record)
end
def ets_delete(key) do
try_create_ets_table()
:ets.delete(unquote(table), key)
end
def ets_keys() do
try_create_ets_table()
Stream.resource(
fn -> :ets.first(unquote(table)) end,
fn
:"$end_of_table" -> {:halt, nil}
previous_key -> {[previous_key], :ets.next(unquote(table), previous_key)}
end,
fn _ -> :ok end
)
end
defp try_create_ets_table do
case :ets.info(unquote(table)) do
:undefined ->
{:created, :ets.new(unquote(table), [unquote(type), :public, :named_table])}
table_name ->
{:already_exists, table_name}
end
end
end
end
end
defmodule Web.Subscription.Message do
@moduledoc """
An abstraction layer around message publishing and
replay storage / functionality
"""
use Db.EtsTable, table: :subscription_messages, type: :bag
defstruct [:replay_id, :payload, :topics]
alias Web.Subscription.Message
@pipeline [
Absinthe.Phase.Document.Execution.Resolution,
Absinthe.Phase.Document.Result
]
@ttl 1_000_000 * 60 * 60
def publish(message, topics) do
replay_id = calculate_replay_id()
data = Map.put(message, :replay_id, replay_id)
Absinthe.Subscription.publish(Web.Endpoint, data, topics)
ets_insert({replay_id, data, topics})
end
def push(%Message{} = msg, docs, socket) do
for doc <- docs do
{:ok, %{result: data}, _} = resolve(msg, doc)
Phoenix.Channel.push(socket, "subscription:data", %{result: data})
end
end
def since(replay_id, topics) do
ets_keys()
|> Stream.filter(&(&1 > replay_id))
|> Stream.flat_map(&ets_lookup(&1))
|> Stream.map(&from_ets/1)
|> Stream.map(&match_topics(&1, topics))
end
def cleanup() do
ets_keys()
|> Stream.filter(&expired/1)
|> Stream.each(&ets_delete/1)
|> Enum.count()
end
defp resolve(%Message{payload: payload}, %Absinthe.Blueprint{} = doc) do
doc.execution.root_value
|> put_in(payload)
|> Absinthe.Pipeline.run(@pipeline)
end
defp from_ets({key, payload, topics}),
do: %Message{replay_id: key, payload: payload, topics: topics}
defp match_topics(%Message{topics: msg_topics} = msg, topics),
do: %{msg | topics: MapSet.intersection(MapSet.new(topics), MapSet.new(msg_topics))}
defp calculate_replay_id, do: DateTime.utc_now() |> DateTime.to_unix(:microsecond)
defp expired(key), do: key < calculate_replay_id() - @ttl
end
defmodule Web.UserSocket do
@moduledoc """
Custom socket that allows us to specify our custom channel, for replays.
"""
use Phoenix.Socket
transport(:websocket, Phoenix.Transports.WebSocket)
def connect(_payload, socket), do: {:ok, socket}
def id(_socket), do: nil
# Copied from & delegated to absinthe in order to specify a custom channel.
# Ideally Absinthe will be improved to support either
# custom event handling, or replay functionality on it's own
# at some point.
channel(
"__absinthe__:*",
Web.AbsintheChannel,
assigns: %{__absinthe_schema__: Web.Schema}
)
defdelegate put_opts(socket, opts), to: Absinthe.Phoenix.Socket
defdelegate put_schema(socket, schema), to: Absinthe.Phoenix.Socket
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment