Skip to content

Instantly share code, notes, and snippets.

@mnussbaumer
Last active March 4, 2019 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnussbaumer/270e10a89733b860489bce230cb615da to your computer and use it in GitHub Desktop.
Save mnussbaumer/270e10a89733b860489bce230cb615da to your computer and use it in GitHub Desktop.
ETS Swapper
defmodule ETSManager do
defmodule Backup do
use GenServer
require Logger
defstruct [main: nil, tables: []]
def start_link(_) do
case GenServer.start_link(__MODULE__, nil, name: __MODULE__) do
Logger.info("Starting ETSManager.Backup...")
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, _pid}} -> :ignore
error -> error
end
end
def init(_args) do
{:ok, %__MODULE__{}}
end
def handle_info({:"ETS-TRANSFER", table, _from, :main}, state) do
Logger.warn("ETSManager was killed - :main cache received...")
{:noreply, %{state | main: table}}
end
def handle_info({:"ETS-TRANSFER", table, _from, name}, %{tables: tables} = state) do
Logger.warn("ETSManager was killed - table #{inspect name} received...")
{:noreply, %{state | tables: [{table, name} | tables]}}
end
def handle_call({:table_request, _}, _, %{main: nil, tables: []} = state) do
Logger.error("Tables recovery was requested but no tables are being held...")
{:reply, {:error, :no_table}, state}
end
def handle_call({:table_request, pid}, _from, %{main: main, tables: tables}) do
case main do
nil -> Logger.warn("Tables recovery was requested but no :main table is being held...")
_ -> :ets.give_away(main, pid, :main)
end
Enum.each(tables, fn({table, name}) ->
Logger.info("Returning table #{inspect name} to ETSManager...")
:ets.give_away(table, pid, name)
end)
{:reply, :ok, %__MODULE__{}}
end
end
defmodule Entity do
@enforce_keys [:name, :tasker]
defstruct [:name, :tasker, options: [:public, read_concurrency: true], populate_callback: false, args: []]
@type t() :: %__MODULE__{
name: atom,
tasker: module,
options: keyword(atom),
populate_callback: fun,
args: list
}
end
use GenServer
require Logger
alias __MODULE__.Entity
@table_name :ets_manager
defstruct [waiting: %{}, to_process: []]
@type t() :: %__MODULE__{
waiting: map,
to_process: list(Entity.t)
}
@spec start_link(list(Entity)) :: {:ok, pid}
def start_link(args) do
Logger.info("Starting ETS Manager")
case GenServer.start_link(__MODULE__, args, name: __MODULE__) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
error -> error
end
end
def init(args) do
Process.flag(:trap_exit, true)
case :ets.info(@table_name) do
:undefined ->
backup_pid = GenServer.whereis(__MODULE__.Backup)
:ets.new(@table_name, [:named_table, {:read_concurrency, true}, {:heir, backup_pid, :main}])
{:ok, %__MODULE__{to_process: args}}
_ ->
{:ok, %__MODULE__{to_process: args}, {:continue, :get_table_back}}
end
end
def handle_continue(:get_table_back, state) do
Logger.info("Requesting tables back...")
GenServer.call(__MODULE__.Backup, {:table_request, self()})
{:noreply, state}
end
def handle_continue(:initialise, %{to_process: nil}), do: {:noreply, %__MODULE__{to_process: []}}
def handle_continue(:initialise, %{to_process: []}), do: {:noreply, %__MODULE__{to_process: []}}
def handle_continue(:initialise, %{to_process: [%{name: name, options: options, populate_callback: pcb, args: args, tasker: tasker} = spec | t]} = state) do
backup_pid = GenServer.whereis(__MODULE__.Backup)
ref = :ets.new(name, [{:heir, backup_pid, name} | options])
:ets.insert(@table_name, {name, ref, spec})
start_task(fn() ->
case {pcb, args} do
{false, _} -> :ok
{_ ,[]} -> :ets.insert(ref, pcb.())
{_, _} -> :ets.insert(ref, pcb.(args))
end
end, tasker)
{:noreply, %{state | to_process: t}, {:continue, :initialise}}
end
def handle_cast({:refresh, %Entity{name: name, options: options, populate_callback: pcb, args: args, tasker: tasker} = spec}, %{waiting: waiting} = state) do
old_ref = case :ets.lookup(@table_name, name) do
[] -> false
[{_, ref, _}] -> ref
end
backup_pid = GenServer.whereis(__MODULE__.Backup)
new_ref = :ets.new(name, [{:heir, backup_pid, name} | options])
%{ref: task_ref} = start_async(fn() ->
case {pcb, args} do
{false, _} -> :ok
{_, []} -> :ets.insert(new_ref, pcb.())
{_, _} -> :ets.insert(new_ref, pcb.(args))
end
end, tasker)
new_waiting = Map.put(waiting, task_ref, {new_ref, old_ref, spec})
{:noreply, %{state | waiting: new_waiting}}
end
def handle_info({:DOWN, ref, :process, _, _} = info, %{waiting: waiting, to_process: top} = state) do
Logger.error("Populate Callback failed with - #{inspect info}")
case Map.pop(waiting, ref) do
{nil, n_waiting} -> {:noreply, %{state | waiting: n_waiting}}
{{new_ref, _, spec}, n_waiting} ->
:ets.delete(new_ref)
{:noreply, %{state | waiting: n_waiting, to_process: [spec | top]}, {:continue, :initialise}}
end
end
def handle_info({ref, _result}, %{waiting: waiting} = state) do
case Map.pop(waiting, ref) do
{{new_ref, old_ref, %{name: name} = spec}, n_waiting} ->
Process.demonitor(ref, [:flush])
case :ets.lookup(@table_name, name) do
[{_, ^old_ref, _}] -> :ets.insert(@table_name, {name, new_ref, spec})
[] -> :ets.insert(@table_name, {name, new_ref, spec})
[{_, mingling_ref, _}] ->
:ets.insert(@table_name, {name, new_ref, spec})
:ets.delete(mingling_ref)
end
case old_ref do
false -> :ok
_ -> case :ets.info(old_ref) do
:undefined -> :ok
_ -> :ets.delete(old_ref)
end
end
Logger.info("Refreshed table #{inspect name}")
{:noreply, %{state | waiting: n_waiting}}
{nil, n_waiting} ->
{:noreply, %{state | waiting: n_waiting}}
end
end
def handle_info({:"ETS-TRANSFER", _tab, _, name}, state) do
Logger.warn("Received Table #{inspect name} back")
{:noreply, state, {:continue, :initialise}}
end
def handle_info(message, state) do
Logger.error("Received in ETSManager: #{inspect message}")
{:noreply, state}
end
defp start_task(func, tasker) do
Task.Supervisor.start_child(tasker, func)
end
defp start_async(func, tasker) do
Task.Supervisor.async_nolink(tasker, func)
end
def lookup(name, key) do
case :ets.lookup(@table_name, name) do
[] -> :error
[{_, ref, _}] -> :ets.lookup(ref, key)
end
end
def match(name, match) do
case :ets.lookup(@table_name, name) do
[] -> :error
[{_, ref, _}] -> :ets.match(ref, match)
end
end
def delete(name, key) do
case :ets.lookup(@table_name, name) do
[] -> :error
[{_, ref, _}] -> :ets.delete(ref, key)
end
end
def insert(name, entry) do
case :ets.lookup(@table_name, name) do
[] -> :error
[{_, ref, _}] -> :ets.insert(ref, entry)
end
end
def tab2list(name) do
case :ets.lookup(@table_name, name) do
[] -> :error
[{_, ref, _}] -> :ets.tab2list(ref)
end
end
end
@mnussbaumer
Copy link
Author

mnussbaumer commented Mar 3, 2019

A simple module to manage ETS tables full contents replacement without ever becoming empty between the time it takes to repopulate them while ensuring the tables live even if processes crash (the tables are started as part of the ETSManagement server which has very few, if any, failure paths, the repopulate functions are called in their own tasks (and in case of failure it reschedules the populate 1 more time), and even if the main process would for some reason fail, it passes the tables as heirs to a Backup Genserver which only purpose is to act as a heir to them and give them back once the main genserver re-initialises)

For instance:

defmodule YourApp.Application do
   use Application

   def start(_type, _args) do
    
    children = [
      {Task.Supervisor, name: YourApp.TaskSupervisor},
      Supervisor.child_spec({ETSManager.Backup, []}, type: :worker),
      Supervisor.child_spec(%{id: ETSManager, start: {ETSManager, :start_link, [[]]}}, type: :worker)
      #....
    ]
    opts = [strategy: :one_for_one, name: YourApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule Test do
   @tasker YourApp.TaskSupervisor
   def repopulate_blog_posts do
        GenServer.cast(ETSManager, {:refresh, %ETSManager.Entity{name: :blog_posts, tasker: @tasker, populate_callback: &populate_top_blog_posts/0}})
   end

   def populate_top_blog_posts do
      # Expensive operations
      # Counts / joins, queries, etc
      # Needs to return a list of elements representing the ETS entries, such as {key, data, data2, etc}
      [{key1, data1, data2}, {key2, data1, data2}]
   end
end

Basically you need to start a TaskSupervisor to use as the callback executioner, you can have multiple using different TaskSupervisor's.
Then you cast to the ETSManager genserver a {:refresh, message} with an %ETSManager.Entity{} struct, where name is the table name you want to use to identify and tasker is the TaskSupervisor that will be responsible for spawning the populate_callback.

So calling Test.repopulate_blog_posts/0 would create or recreate an ETS table that could then be addressed by its name, :blog_posts if using the ETSManager interface, e.g. ETSManager.lookup(:blog_posts, 1). The ETSManager module ensures that even during a repopulate call, your app will still be able to access the table at all points, and that once the repopulate is done, the table references are swapped, so when you access it again it will already be with the fresh entries, and that once the swap is done the previous table is erased.

This was mostly to solve the issue of having bulk refreshes on tables (where the new keys might not be exactly the same as before), that could for some amount of time be empty if using the "delete_all_objects" approach, and to prevent having to design a system that would diff the keys to eliminate keys that no longer were present. Since the access is done through a pointer to the ref of each created table, it's always up to date and the design is very simple and intuitive, while keeping the repopulates asynchronous and non-blocking.

TODO

Add streaming capabilities, regular callbacks to run once the populate callbacks have been run, etc...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment