-
-
Save athal7/27ce6dba44b092710b968a1ff1ba45f7 to your computer and use it in GitHub Desktop.
Absinthe subscription replay functionality
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Web.AbsintheChannel do | |
@moduledoc """ | |
A custom channel that uses most of what Absinthe's | |
channel has to offer, but with additional handling | |
for 'replay'. | |
""" | |
use Phoenix.Channel | |
alias Web.Subscription.Message | |
def handle_in("replay", %{"replayId" => replay_id, "subscriptionId" => doc_id}, socket) do | |
replay_id | |
|> Message.since(doc_id_topics(doc_id, socket)) | |
|> Enum.each(&Message.push(&1, docs(socket, &1.topics, doc_id), socket)) | |
{:noreply, socket} | |
end | |
# Delegated to absinthe in order to handle our custom `replay` event. | |
# Ideally Absinthe will be improved to support either | |
# custom event handling, or replay functionality on it's own | |
# at some point. | |
defdelegate handle_in(event, payload, socket), to: Absinthe.Phoenix.Channel | |
defdelegate join(channel, message, socket), to: Absinthe.Phoenix.Channel | |
defp doc_id_topics(doc_id, socket) do | |
socket.endpoint | |
|> Absinthe.Subscription.registry_name() | |
|> Registry.lookup({self(), doc_id}) | |
|> Enum.map(fn {_self, field_key} -> field_key end) | |
end | |
defp docs(socket, topics, doc_id) do | |
topics | |
|> Enum.flat_map(fn topic -> | |
socket.endpoint | |
|> Absinthe.Subscription.get(topic) | |
|> Enum.filter(fn {id, _doc} -> id == doc_id end) | |
|> Enum.map(fn {_id, doc} -> doc end) | |
end) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Web.Subscription.Cleanup do | |
@moduledoc """ | |
A process to periodically cleanup subscription | |
messages that are stored for replay functionality. | |
""" | |
use Task | |
import Logger | |
@interval 1000 * 60 | |
def start_link(_arg) do | |
Task.start_link(&poll_cleanup/0) | |
end | |
def poll_cleanup() do | |
receive do | |
after | |
@interval -> | |
count = Web.Subscription.Message.cleanup() | |
Logger.info("Cleaned up expired subscription messages", count: count) | |
poll_cleanup() | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Db.EtsTable do | |
@moduledoc """ | |
Helper functionality for easily interacting with | |
Erlang Term Storage in ways we commonly use. | |
""" | |
defmacro __using__(table: table, type: type) do | |
quote do | |
def ets_lookup(key) do | |
try_create_ets_table() | |
:ets.lookup(unquote(table), key) | |
end | |
def ets_insert(record) do | |
try_create_ets_table() | |
:ets.insert_new(unquote(table), record) | |
end | |
def ets_delete(key) do | |
try_create_ets_table() | |
:ets.delete(unquote(table), key) | |
end | |
def ets_keys() do | |
try_create_ets_table() | |
Stream.resource( | |
fn -> :ets.first(unquote(table)) end, | |
fn | |
:"$end_of_table" -> {:halt, nil} | |
previous_key -> {[previous_key], :ets.next(unquote(table), previous_key)} | |
end, | |
fn _ -> :ok end | |
) | |
end | |
defp try_create_ets_table do | |
case :ets.info(unquote(table)) do | |
:undefined -> | |
{:created, :ets.new(unquote(table), [unquote(type), :public, :named_table])} | |
table_name -> | |
{:already_exists, table_name} | |
end | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Web.Subscription.Message do | |
@moduledoc """ | |
An abstraction layer around message publishing and | |
replay storage / functionality | |
""" | |
use Db.EtsTable, table: :subscription_messages, type: :bag | |
defstruct [:replay_id, :payload, :topics] | |
alias Web.Subscription.Message | |
@pipeline [ | |
Absinthe.Phase.Document.Execution.Resolution, | |
Absinthe.Phase.Document.Result | |
] | |
@ttl 1_000_000 * 60 * 60 | |
def publish(message, topics) do | |
replay_id = calculate_replay_id() | |
data = Map.put(message, :replay_id, replay_id) | |
Absinthe.Subscription.publish(Web.Endpoint, data, topics) | |
ets_insert({replay_id, data, topics}) | |
end | |
def push(%Message{} = msg, docs, socket) do | |
for doc <- docs do | |
{:ok, %{result: data}, _} = resolve(msg, doc) | |
Phoenix.Channel.push(socket, "subscription:data", %{result: data}) | |
end | |
end | |
def since(replay_id, topics) do | |
ets_keys() | |
|> Stream.filter(&(&1 > replay_id)) | |
|> Stream.flat_map(&ets_lookup(&1)) | |
|> Stream.map(&from_ets/1) | |
|> Stream.map(&match_topics(&1, topics)) | |
end | |
def cleanup() do | |
ets_keys() | |
|> Stream.filter(&expired/1) | |
|> Stream.each(&ets_delete/1) | |
|> Enum.count() | |
end | |
defp resolve(%Message{payload: payload}, %Absinthe.Blueprint{} = doc) do | |
doc.execution.root_value | |
|> put_in(payload) | |
|> Absinthe.Pipeline.run(@pipeline) | |
end | |
defp from_ets({key, payload, topics}), | |
do: %Message{replay_id: key, payload: payload, topics: topics} | |
defp match_topics(%Message{topics: msg_topics} = msg, topics), | |
do: %{msg | topics: MapSet.intersection(MapSet.new(topics), MapSet.new(msg_topics))} | |
defp calculate_replay_id, do: DateTime.utc_now() |> DateTime.to_unix(:microsecond) | |
defp expired(key), do: key < calculate_replay_id() - @ttl | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Web.UserSocket do | |
@moduledoc """ | |
Custom socket that allows us to specify our custom channel, for replays. | |
""" | |
use Phoenix.Socket | |
transport(:websocket, Phoenix.Transports.WebSocket) | |
def connect(_payload, socket), do: {:ok, socket} | |
def id(_socket), do: nil | |
# Copied from & delegated to absinthe in order to specify a custom channel. | |
# Ideally Absinthe will be improved to support either | |
# custom event handling, or replay functionality on it's own | |
# at some point. | |
channel( | |
"__absinthe__:*", | |
Web.AbsintheChannel, | |
assigns: %{__absinthe_schema__: Web.Schema} | |
) | |
defdelegate put_opts(socket, opts), to: Absinthe.Phoenix.Socket | |
defdelegate put_schema(socket, schema), to: Absinthe.Phoenix.Socket | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment