Skip to content

Instantly share code, notes, and snippets.

@bruth
Last active July 2, 2024 13:36
Show Gist options
  • Save bruth/6d53a3c2138c5adf53f5 to your computer and use it in GitHub Desktop.
Save bruth/6d53a3c2138c5adf53f5 to your computer and use it in GitHub Desktop.
Postgres push notification

Postgres push triggers

Watch a table for changes and push a notification with a payload describing the change.

Example

In the Postgres shell:

-- Create the functions
\i watch.sql

-- Create a table to watch.
create table users (id int, name text);

-- Watch the table and send notifications to channel 'changefeed'
select watch_table(users, 'changefeed');

-- Listen on the channel. This is asynchronous.
listen changefeed;

-- Insert a record into the users table.
insert into users values (1, 'Joe');

You will see the following appear in the shell as a notification.

Asynchronous notification "changefeed" with payload "{"row_data": {"id": 1, "name": "Joe"}, "operation": "INSERT", "table_name": "users", "schema_name": "public", "capture_time": "2016-03-04T21:24:48.24746+00:00", "transaction_time": "2016-03-04T21:24:48.247295+00:00"}" received from server process with PID 84.

Library Support

-- Requires Postgres 9.4+
-- Check if a row or table has been modifed.
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $$
DECLARE
channel text;
payload jsonb;
rowdata jsonb;
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
END IF;
-- Determine operation type
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(OLD.*);
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(OLD.*);
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(NEW.*);
ELSIF NOT (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
-- Construct JSON payload
payload = jsonb_build_object('schema_name', TG_TABLE_SCHEMA::text,
'table_name', TG_TABLE_NAME::text,
'operation', TG_OP,
'transaction_time', transaction_timestamp(),
'capture_time', clock_timestamp(),
'data', rowdata);
channel = TG_ARGV[0];
-- Notify to channel with serialized JSON payload.
perform pg_notify(channel, payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create triggers that will execute on any change to the table.
CREATE OR REPLACE FUNCTION watch_table(target_table regclass, channel text) RETURNS void AS $$
DECLARE
stmt text;
BEGIN
-- Drop existing triggers if they exist.
EXECUTE unwatch_table(target_table);
-- Row level watch trigger.
stmt = 'CREATE TRIGGER watch_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
target_table || ' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(channel) || ');';
RAISE NOTICE '%', stmt;
EXECUTE stmt;
-- Truncate level watch trigger. This will not contain any row data.
stmt = 'CREATE TRIGGER watch_trigger_stmt AFTER TRUNCATE ON ' ||
target_table || ' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(channel) || ');';
RAISE NOTICE '%', stmt;
EXECUTE stmt;
END;
$$ LANGUAGE plpgsql;
-- Unwatch a table.
CREATE OR REPLACE FUNCTION unwatch_table(target_table regclass) RETURNS void AS $$
BEGIN;
EXECUTE 'DROP TRIGGER IF EXISTS watch_trigger_row ON ' || target_table;
EXECUTE 'DROP TRIGGER IF EXISTS watch_trigger_stmt ON ' || target_table;
END;
$$ LANGUAGE plpgsql;
@alexesDev
Copy link

alexesDev commented Sep 6, 2017

For tables in other schema

select watch_table('client.payments'::regclass, 'payment_feed');

@agilov
Copy link

agilov commented Dec 26, 2017

fix BEGIN for unwatch_table function. thank you

@agilov
Copy link

agilov commented Jan 11, 2018

in this code:
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(OLD.*);

better to use NEW.*

if use OLD - trigger cannot send actual updated data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment