Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@marioidival
Forked from bruth/README.md
Created February 19, 2018 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marioidival/bd266b68c2b3f16d412a66ee670c36c0 to your computer and use it in GitHub Desktop.
Save marioidival/bd266b68c2b3f16d412a66ee670c36c0 to your computer and use it in GitHub Desktop.
Postgres push notification

Postgres push triggers

Watch a table for changes and push a notification with a payload describing the change.

Example

In the Postgres shell:

-- Create the functions
\i watch.sql

-- Create a table to watch.
create table users (id int, name text);

-- Watch the table and send notifications to channel 'changefeed'
select watch_table(users, 'changefeed');

-- Listen on the channel. This is asynchronous.
listen changefeed;

-- Insert a record into the users table.
insert into users values (1, 'Joe');

You will see the following appear in the shell as a notification.

Asynchronous notification "changefeed" with payload "{"row_data": {"id": 1, "name": "Joe"}, "operation": "INSERT", "table_name": "users", "schema_name": "public", "capture_time": "2016-03-04T21:24:48.24746+00:00", "transaction_time": "2016-03-04T21:24:48.247295+00:00"}" received from server process with PID 84.

Library Support

-- Requires Postgres 9.4+
-- Check if a row or table has been modifed.
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $$
DECLARE
channel text;
payload jsonb;
rowdata jsonb;
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
END IF;
-- Determine operation type
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(OLD.*);
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(OLD.*);
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
rowdata = row_to_json(NEW.*);
ELSIF NOT (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
-- Construct JSON payload
payload = jsonb_build_object('schema_name', TG_TABLE_SCHEMA::text,
'table_name', TG_TABLE_NAME::text,
'operation', TG_OP,
'transaction_time', transaction_timestamp(),
'capture_time', clock_timestamp(),
'data', rowdata);
channel = TG_ARGV[0];
-- Notify to channel with serialized JSON payload.
perform pg_notify(channel, payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create triggers that will execute on any change to the table.
CREATE OR REPLACE FUNCTION watch_table(target_table regclass, channel text) RETURNS void AS $$
DECLARE
stmt text;
BEGIN
-- Drop existing triggers if they exist.
EXECUTE unwatch_table(target_table);
-- Row level watch trigger.
stmt = 'CREATE TRIGGER watch_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
target_table || ' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(channel) || ');';
RAISE NOTICE '%', stmt;
EXECUTE stmt;
-- Truncate level watch trigger. This will not contain any row data.
stmt = 'CREATE TRIGGER watch_trigger_stmt AFTER TRUNCATE ON ' ||
target_table || ' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(channel) || ');';
RAISE NOTICE '%', stmt;
EXECUTE stmt;
END;
$$ LANGUAGE plpgsql;
-- Unwatch a table.
CREATE OR REPLACE FUNCTION unwatch_table(target_table regclass) RETURNS void AS $$
EXECUTE 'DROP TRIGGER IF EXISTS watch_trigger_row ON ' || target_table;
EXECUTE 'DROP TRIGGER IF EXISTS watch_trigger_stmt ON ' || target_table;
END;
$$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment