Skip to content

Instantly share code, notes, and snippets.

@colophonemes
Last active February 17, 2024 15:15
Show Gist options
  • Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'email',
'username'
);
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'person_id',
'amount',
'currency_code',
'start_date',
'end_date',
'data_source'
);
const pg = require('pg')
var pgConString = process.env.DATABASE_URL
// Connect to the DB
pg.connect(pgConString, function (err, client) {
if (err) {
console.error(err)
}
// Handle notifications
client.on('notification', function (msg) {
const payload = msg.payload
console.log(payload)
// Send payload into a queue etc...
})
// Listen for NOTIFY calls
var query = client.query('LISTEN db_notifications')
})
-- Trigger notification for messaging to PG Notify
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value TEXT;
payload_items TEXT[];
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT $1.%I::TEXT', column_name)
INTO column_value
USING rec;
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"');
END LOOP;
-- Build the payload
payload := ''
|| '{'
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",'
|| '"operation":"' || TG_OP || '",'
|| '"schema":"' || TG_TABLE_SCHEMA || '",'
|| '"table":"' || TG_TABLE_NAME || '",'
|| '"data":{' || array_to_string(payload_items, ',') || '}'
|| '}';
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
@deft01
Copy link

deft01 commented Jun 20, 2022

Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.

CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT $1.%I::TEXT', column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;```

thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment