Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Elixir Phoenix Postgresql migration to add triggers for pubsub to run on every CRUD operation on every table. If a new table is added, it'll automatically add a trigger to that table too.
defmodule MyApp.Repo.Migrations.AddPostgresTriggerAndFunctionForAllTables do
use Ecto.Migration
def up do
# Create a function that broadcasts row changes
execute "
CREATE OR REPLACE FUNCTION broadcast_changes()
RETURNS trigger AS $$
DECLARE
current_row RECORD;
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
current_row := NEW;
ELSE
current_row := OLD;
END IF;
IF (TG_OP = 'INSERT') THEN
OLD := NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
NEW := OLD;
END IF;
PERFORM pg_notify(
'table_changes',
json_build_object(
'table', TG_TABLE_NAME,
'type', TG_OP,
'id', current_row.id,
'new_row_data', row_to_json(NEW),
'old_row_data', row_to_json(OLD)
)::text
);
RETURN current_row;
END;
$$ LANGUAGE plpgsql;"
# Create a trigger that links all of the tables to the broadcast function. Skip the migrations table.
execute "CREATE OR REPLACE FUNCTION create_notify_triggers()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT *
FROM information_schema.tables
where table_schema = 'public'
and table_name <> 'schema_migrations'
LOOP
RAISE NOTICE 'CREATE FOR: %', r.table_name::text;
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';';
EXECUTE 'CREATE TRIGGER notify_table_changes_trigger
AFTER INSERT OR UPDATE OR DELETE
ON ' || r.table_name || '
FOR EACH ROW
EXECUTE PROCEDURE broadcast_changes();';
END LOOP;
END;
$$;"
#What if we add more tables later, after this is run? This adds a trigger to add the above triggers to any new tables as well.
execute "CREATE EVENT TRIGGER add_table_broadcast_triggers ON ddl_command_end
WHEN TAG IN ('CREATE TABLE','CREATE TABLE AS')
EXECUTE PROCEDURE create_notify_triggers();"
end
def down do
execute "DROP EVENT TRIGGER add_table_broadcast_triggers"
execute "FOR r IN SELECT *
FROM information_schema.tables
where table_schema = 'public'
and table_name <> 'schema_migrations'
LOOP
RAISE NOTICE 'CREATE FOR: %', r.table_name::text;
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';';
END LOOP;"
end
end
#And the listener code (separate module anywhere you'd like
#This module namespace and name can be whatever you want
defmodule MyApp.DatabaseListener.Listener do
use GenServer
require Logger
import Poison, only: [decode!: 1]
@doc """
Initialize the GenServer
"""
@spec start_link([String.t], [any]) :: {:ok, pid}
def start_link(channel, otp_opts \\ []), do: GenServer.start_link(__MODULE__, channel, otp_opts)
@doc """
When the GenServer starts subscribe to the given channel
"""
@spec init([String.t]) :: {:ok, []}
def init(channel) do
Logger.debug("Starting #{ __MODULE__ } with channel subscription: #{channel}")
pg_config = MyApp.Repo.config()
{:ok, pid} = Postgrex.Notifications.start_link(pg_config)
{:ok, ref} = Postgrex.Notifications.listen(pid, channel)
{:ok, {pid, channel, ref}}
end
@doc """
Listen for changes
"""
def handle_info({:notification, _pid, _ref, "table_changes", payload}, _state) do
change = payload
|> decode!()
#change will decode json into a list with:
# type - what crud operation it is
# table - what table it was done on
# id - the ID of the row, but will also be in the old and new row data
# new_row_data - the new data either inserted or updated on the row, or nil in case of delete
# old_row_data - the old data that use to be on the row, or nil in case of insert
# You can get these with change["type"] for instance and do whatever you want with them below this line
{:noreply, :event_handled}
end
def handle_info(_, _state), do: {:noreply, :event_received}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment