Skip to content

Instantly share code, notes, and snippets.

@carterbryden
Last active December 18, 2023 08:10
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save carterbryden/e7b19d1ba1ea92e241fed259fea23491 to your computer and use it in GitHub Desktop.
Save carterbryden/e7b19d1ba1ea92e241fed259fea23491 to your computer and use it in GitHub Desktop.
Elixir Phoenix Postgresql migration to add triggers for pubsub to run on every CRUD operation on every table. If a new table is added, it'll automatically add a trigger to that table too.
defmodule MyApp.Repo.Migrations.AddPostgresTriggerAndFunctionForAllTables do
use Ecto.Migration
def up do
# Create a function that broadcasts row changes
execute "
CREATE OR REPLACE FUNCTION broadcast_changes()
RETURNS trigger AS $$
DECLARE
current_row RECORD;
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
current_row := NEW;
ELSE
current_row := OLD;
END IF;
IF (TG_OP = 'INSERT') THEN
OLD := NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
NEW := OLD;
END IF;
PERFORM pg_notify(
'table_changes',
json_build_object(
'table', TG_TABLE_NAME,
'type', TG_OP,
'id', current_row.id,
'new_row_data', row_to_json(NEW),
'old_row_data', row_to_json(OLD)
)::text
);
RETURN current_row;
END;
$$ LANGUAGE plpgsql;"
# Create a trigger that links all of the tables to the broadcast function. Skip the migrations table.
execute "CREATE OR REPLACE FUNCTION create_notify_triggers()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
r RECORD;
BEGIN
FOR r IN SELECT *
FROM information_schema.tables
where table_schema = 'public'
and table_name <> 'schema_migrations'
LOOP
RAISE NOTICE 'CREATE FOR: %', r.table_name::text;
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';';
EXECUTE 'CREATE TRIGGER notify_table_changes_trigger
AFTER INSERT OR UPDATE OR DELETE
ON ' || r.table_name || '
FOR EACH ROW
EXECUTE PROCEDURE broadcast_changes();';
END LOOP;
END;
$$;"
#What if we add more tables later, after this is run? This adds a trigger to add the above triggers to any new tables as well.
execute "CREATE EVENT TRIGGER add_table_broadcast_triggers ON ddl_command_end
WHEN TAG IN ('CREATE TABLE','CREATE TABLE AS')
EXECUTE PROCEDURE create_notify_triggers();"
end
def down do
execute "DROP EVENT TRIGGER add_table_broadcast_triggers"
execute "FOR r IN SELECT *
FROM information_schema.tables
where table_schema = 'public'
and table_name <> 'schema_migrations'
LOOP
RAISE NOTICE 'CREATE FOR: %', r.table_name::text;
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';';
END LOOP;"
end
end
#And the listener code (separate module anywhere you'd like
#This module namespace and name can be whatever you want
defmodule MyApp.DatabaseListener.Listener do
use GenServer
require Logger
import Poison, only: [decode!: 1]
@doc """
Initialize the GenServer
"""
@spec start_link([String.t], [any]) :: {:ok, pid}
def start_link(channel, otp_opts \\ []), do: GenServer.start_link(__MODULE__, channel, otp_opts)
@doc """
When the GenServer starts subscribe to the given channel
"""
@spec init([String.t]) :: {:ok, []}
def init(channel) do
Logger.debug("Starting #{ __MODULE__ } with channel subscription: #{channel}")
pg_config = MyApp.Repo.config()
{:ok, pid} = Postgrex.Notifications.start_link(pg_config)
{:ok, ref} = Postgrex.Notifications.listen(pid, channel)
{:ok, {pid, channel, ref}}
end
@doc """
Listen for changes
"""
def handle_info({:notification, _pid, _ref, "table_changes", payload}, _state) do
change = payload
|> decode!()
#change will decode json into a list with:
# type - what crud operation it is
# table - what table it was done on
# id - the ID of the row, but will also be in the old and new row data
# new_row_data - the new data either inserted or updated on the row, or nil in case of delete
# old_row_data - the old data that use to be on the row, or nil in case of insert
# You can get these with change["type"] for instance and do whatever you want with them below this line
{:noreply, :event_handled}
end
def handle_info(_, _state), do: {:noreply, :event_received}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment