Last active
March 4, 2019 16:25
-
-
Save mnussbaumer/270e10a89733b860489bce230cb615da to your computer and use it in GitHub Desktop.
ETS Swapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ETSManager do | |
defmodule Backup do | |
use GenServer | |
require Logger | |
defstruct [main: nil, tables: []] | |
def start_link(_) do | |
case GenServer.start_link(__MODULE__, nil, name: __MODULE__) do | |
Logger.info("Starting ETSManager.Backup...") | |
{:ok, pid} -> {:ok, pid} | |
{:error, {:already_started, _pid}} -> :ignore | |
error -> error | |
end | |
end | |
def init(_args) do | |
{:ok, %__MODULE__{}} | |
end | |
def handle_info({:"ETS-TRANSFER", table, _from, :main}, state) do | |
Logger.warn("ETSManager was killed - :main cache received...") | |
{:noreply, %{state | main: table}} | |
end | |
def handle_info({:"ETS-TRANSFER", table, _from, name}, %{tables: tables} = state) do | |
Logger.warn("ETSManager was killed - table #{inspect name} received...") | |
{:noreply, %{state | tables: [{table, name} | tables]}} | |
end | |
def handle_call({:table_request, _}, _, %{main: nil, tables: []} = state) do | |
Logger.error("Tables recovery was requested but no tables are being held...") | |
{:reply, {:error, :no_table}, state} | |
end | |
def handle_call({:table_request, pid}, _from, %{main: main, tables: tables}) do | |
case main do | |
nil -> Logger.warn("Tables recovery was requested but no :main table is being held...") | |
_ -> :ets.give_away(main, pid, :main) | |
end | |
Enum.each(tables, fn({table, name}) -> | |
Logger.info("Returning table #{inspect name} to ETSManager...") | |
:ets.give_away(table, pid, name) | |
end) | |
{:reply, :ok, %__MODULE__{}} | |
end | |
end | |
defmodule Entity do | |
@enforce_keys [:name, :tasker] | |
defstruct [:name, :tasker, options: [:public, read_concurrency: true], populate_callback: false, args: []] | |
@type t() :: %__MODULE__{ | |
name: atom, | |
tasker: module, | |
options: keyword(atom), | |
populate_callback: fun, | |
args: list | |
} | |
end | |
use GenServer | |
require Logger | |
alias __MODULE__.Entity | |
@table_name :ets_manager | |
defstruct [waiting: %{}, to_process: []] | |
@type t() :: %__MODULE__{ | |
waiting: map, | |
to_process: list(Entity.t) | |
} | |
@spec start_link(list(Entity)) :: {:ok, pid} | |
def start_link(args) do | |
Logger.info("Starting ETS Manager") | |
case GenServer.start_link(__MODULE__, args, name: __MODULE__) do | |
{:ok, pid} -> {:ok, pid} | |
{:error, {:already_started, pid}} -> {:ok, pid} | |
error -> error | |
end | |
end | |
def init(args) do | |
Process.flag(:trap_exit, true) | |
case :ets.info(@table_name) do | |
:undefined -> | |
backup_pid = GenServer.whereis(__MODULE__.Backup) | |
:ets.new(@table_name, [:named_table, {:read_concurrency, true}, {:heir, backup_pid, :main}]) | |
{:ok, %__MODULE__{to_process: args}} | |
_ -> | |
{:ok, %__MODULE__{to_process: args}, {:continue, :get_table_back}} | |
end | |
end | |
def handle_continue(:get_table_back, state) do | |
Logger.info("Requesting tables back...") | |
GenServer.call(__MODULE__.Backup, {:table_request, self()}) | |
{:noreply, state} | |
end | |
def handle_continue(:initialise, %{to_process: nil}), do: {:noreply, %__MODULE__{to_process: []}} | |
def handle_continue(:initialise, %{to_process: []}), do: {:noreply, %__MODULE__{to_process: []}} | |
def handle_continue(:initialise, %{to_process: [%{name: name, options: options, populate_callback: pcb, args: args, tasker: tasker} = spec | t]} = state) do | |
backup_pid = GenServer.whereis(__MODULE__.Backup) | |
ref = :ets.new(name, [{:heir, backup_pid, name} | options]) | |
:ets.insert(@table_name, {name, ref, spec}) | |
start_task(fn() -> | |
case {pcb, args} do | |
{false, _} -> :ok | |
{_ ,[]} -> :ets.insert(ref, pcb.()) | |
{_, _} -> :ets.insert(ref, pcb.(args)) | |
end | |
end, tasker) | |
{:noreply, %{state | to_process: t}, {:continue, :initialise}} | |
end | |
def handle_cast({:refresh, %Entity{name: name, options: options, populate_callback: pcb, args: args, tasker: tasker} = spec}, %{waiting: waiting} = state) do | |
old_ref = case :ets.lookup(@table_name, name) do | |
[] -> false | |
[{_, ref, _}] -> ref | |
end | |
backup_pid = GenServer.whereis(__MODULE__.Backup) | |
new_ref = :ets.new(name, [{:heir, backup_pid, name} | options]) | |
%{ref: task_ref} = start_async(fn() -> | |
case {pcb, args} do | |
{false, _} -> :ok | |
{_, []} -> :ets.insert(new_ref, pcb.()) | |
{_, _} -> :ets.insert(new_ref, pcb.(args)) | |
end | |
end, tasker) | |
new_waiting = Map.put(waiting, task_ref, {new_ref, old_ref, spec}) | |
{:noreply, %{state | waiting: new_waiting}} | |
end | |
def handle_info({:DOWN, ref, :process, _, _} = info, %{waiting: waiting, to_process: top} = state) do | |
Logger.error("Populate Callback failed with - #{inspect info}") | |
case Map.pop(waiting, ref) do | |
{nil, n_waiting} -> {:noreply, %{state | waiting: n_waiting}} | |
{{new_ref, _, spec}, n_waiting} -> | |
:ets.delete(new_ref) | |
{:noreply, %{state | waiting: n_waiting, to_process: [spec | top]}, {:continue, :initialise}} | |
end | |
end | |
def handle_info({ref, _result}, %{waiting: waiting} = state) do | |
case Map.pop(waiting, ref) do | |
{{new_ref, old_ref, %{name: name} = spec}, n_waiting} -> | |
Process.demonitor(ref, [:flush]) | |
case :ets.lookup(@table_name, name) do | |
[{_, ^old_ref, _}] -> :ets.insert(@table_name, {name, new_ref, spec}) | |
[] -> :ets.insert(@table_name, {name, new_ref, spec}) | |
[{_, mingling_ref, _}] -> | |
:ets.insert(@table_name, {name, new_ref, spec}) | |
:ets.delete(mingling_ref) | |
end | |
case old_ref do | |
false -> :ok | |
_ -> case :ets.info(old_ref) do | |
:undefined -> :ok | |
_ -> :ets.delete(old_ref) | |
end | |
end | |
Logger.info("Refreshed table #{inspect name}") | |
{:noreply, %{state | waiting: n_waiting}} | |
{nil, n_waiting} -> | |
{:noreply, %{state | waiting: n_waiting}} | |
end | |
end | |
def handle_info({:"ETS-TRANSFER", _tab, _, name}, state) do | |
Logger.warn("Received Table #{inspect name} back") | |
{:noreply, state, {:continue, :initialise}} | |
end | |
def handle_info(message, state) do | |
Logger.error("Received in ETSManager: #{inspect message}") | |
{:noreply, state} | |
end | |
defp start_task(func, tasker) do | |
Task.Supervisor.start_child(tasker, func) | |
end | |
defp start_async(func, tasker) do | |
Task.Supervisor.async_nolink(tasker, func) | |
end | |
def lookup(name, key) do | |
case :ets.lookup(@table_name, name) do | |
[] -> :error | |
[{_, ref, _}] -> :ets.lookup(ref, key) | |
end | |
end | |
def match(name, match) do | |
case :ets.lookup(@table_name, name) do | |
[] -> :error | |
[{_, ref, _}] -> :ets.match(ref, match) | |
end | |
end | |
def delete(name, key) do | |
case :ets.lookup(@table_name, name) do | |
[] -> :error | |
[{_, ref, _}] -> :ets.delete(ref, key) | |
end | |
end | |
def insert(name, entry) do | |
case :ets.lookup(@table_name, name) do | |
[] -> :error | |
[{_, ref, _}] -> :ets.insert(ref, entry) | |
end | |
end | |
def tab2list(name) do | |
case :ets.lookup(@table_name, name) do | |
[] -> :error | |
[{_, ref, _}] -> :ets.tab2list(ref) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A simple module to manage ETS tables full contents replacement without ever becoming empty between the time it takes to repopulate them while ensuring the tables live even if processes crash (the tables are started as part of the ETSManagement server which has very few, if any, failure paths, the repopulate functions are called in their own tasks (and in case of failure it reschedules the populate 1 more time), and even if the main process would for some reason fail, it passes the tables as heirs to a Backup Genserver which only purpose is to act as a heir to them and give them back once the main genserver re-initialises)
For instance:
Basically you need to start a
TaskSupervisor
to use as the callback executioner, you can have multiple using differentTaskSupervisor
's.Then you
cast
to theETSManager
genserver a{:refresh, message}
with an %ETSManager.Entity{} struct, wherename
is the table name you want to use to identify andtasker
is the TaskSupervisor that will be responsible for spawning the populate_callback.So calling
Test.repopulate_blog_posts/0
would create or recreate an ETS table that could then be addressed by its name,:blog_posts
if using theETSManager
interface, e.g.ETSManager.lookup(:blog_posts, 1)
. TheETSManager
module ensures that even during arepopulate
call, your app will still be able to access the table at all points, and that once the repopulate is done, the table references are swapped, so when you access it again it will already be with the fresh entries, and that once the swap is done the previous table is erased.This was mostly to solve the issue of having bulk refreshes on tables (where the new keys might not be exactly the same as before), that could for some amount of time be empty if using the "delete_all_objects" approach, and to prevent having to design a system that would diff the keys to eliminate keys that no longer were present. Since the access is done through a pointer to the ref of each created table, it's always up to date and the design is very simple and intuitive, while keeping the repopulates asynchronous and non-blocking.
TODO
Add streaming capabilities, regular callbacks to run once the populate callbacks have been run, etc...