Skip to content

Instantly share code, notes, and snippets.

@elfenlaid
Created May 16, 2024 13:59
Show Gist options
  • Save elfenlaid/5a7cbd32a0161e14e3f5a12492c390bc to your computer and use it in GitHub Desktop.
Save elfenlaid/5a7cbd32a0161e14e3f5a12492c390bc to your computer and use it in GitHub Desktop.
defmodule CollectionListener do
use GenServer
defstruct [:topology, :collection, :handler, :cursor_task, :token]
@impl true
def init(args) do
topology = Keyword.fetch!(args, :topology)
collection = Keyword.fetch!(args, :collection)
handler = Keyword.fetch!(args, :handler)
state = %__MODULE__{topology: topology, collection: collection, handler: handler}
{:ok, state, {:continue, :init_cursor}}
end
@impl true
def handle_continue(:init_cursor, %__MODULE__{} = state) do
pid = self()
{:ok, cursor_task} = Task.start_link(fn ->
cursor =
Mongo.watch_collection(
state.topology,
state.collection,
[],
&send(pid, {:new_token, &1})
)
Enum.each(cursor, &send(pid, {:change, &1}))
end)
{:noreply, %{state | cursor_task: cursor_task}}
end
@impl true
def handle_info({:new_token, token}, %__MODULE__{} = state) do
{:noreply, %{state | token: token}}
end
def handle_info({:change, document}, %__MODULE__{} = state) do
state.handler.(state.token, document)
{:noreply, state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment