Skip to content

Instantly share code, notes, and snippets.

@colophonemes
Last active February 17, 2024 15:15
Show Gist options
  • Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'email',
'username'
);
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'person_id',
'amount',
'currency_code',
'start_date',
'end_date',
'data_source'
);
const pg = require('pg')
var pgConString = process.env.DATABASE_URL
// Connect to the DB
pg.connect(pgConString, function (err, client) {
if (err) {
console.error(err)
}
// Handle notifications
client.on('notification', function (msg) {
const payload = msg.payload
console.log(payload)
// Send payload into a queue etc...
})
// Listen for NOTIFY calls
var query = client.query('LISTEN db_notifications')
})
-- Trigger notification for messaging to PG Notify
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value TEXT;
payload_items TEXT[];
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT $1.%I::TEXT', column_name)
INTO column_value
USING rec;
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"');
END LOOP;
-- Build the payload
payload := ''
|| '{'
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",'
|| '"operation":"' || TG_OP || '",'
|| '"schema":"' || TG_TABLE_SCHEMA || '",'
|| '"table":"' || TG_TABLE_NAME || '",'
|| '"data":{' || array_to_string(payload_items, ',') || '}'
|| '}';
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
@mStirner
Copy link

@mStirner I think it's because you're trying to coalesce a JSONB type with a JSON type. Maybe try using jsonb_build_object without the cast instead of json_build_object()::JSONB?

I have no idea from sql (postgresql)... I have opend a qustion on StackExcahnge (https://dba.stackexchange.com/questions/275297/postgresql-enable-notification-via-trigger-function?noredirect=1#comment539912_275297) some one there writed:

The || operator for jsonb was introduced in 9.5 and is not available in your un-supported version

But i have no idea how to solve this...

@raulsaeztapia
Copy link

First of all, thanks for writing this code.

In order to have dynamic types into column_value, instead all values as String, I have changed it as:

DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value JSONB;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || column_value;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

As result we get:
{"data": {"id": 1111, "hash": 2222, "type": "LIBRARY", "score": 3.03, "viewed": false, "version": 0, "state_id": 1, "lastenv_id": 3, "environment": "SERVER", "firstenv_id": 3, "occurrences": 2, "application_id": 3333, "lastocc urrence": "2021-08-25T16:43:54.376+00:00", "firstoccurrence": "2021-08-25T16:43:54.376+00:00", "organization_id": 4444, "processingstate": "CREATED", "statechangetime": null, "lastappversion_id": 5555, "firstappversion_id": 6666}, "table": "table_things", "schema": "public", "operation": "UPDATE", "timestamp": "2021-09-01T17:06:59.447523+00:00"}

@sguldemond
Copy link

Thanks for this! I was looking for a design, where I can listen to changes in the database and act on them from a Python application I'm writing. Only after some research I noticed that the NOTIFY/LISTEN design is not great for this purpose. Because when there are no listeners, all payloads from NOTIFY are instantly lost, and I need to make sure I act on all relevant changes. This scenario is also discussed here: https://stackoverflow.com/questions/23087347/what-happens-with-a-notify-when-no-session-has-issued-listen-in-postgresql

I repurposed most of the code to serve my needs and write trigger data to a new database. Also I'm persisting both the OLD and NEW and an extra string to identify the trigger.

persist_trigger.sql:

-- Create table to store trigger data in
CREATE TABLE invopro.trigger_data(
  id SERIAL PRIMARY KEY,
  timestamp TIMESTAMP,
  operation TEXT,
  schema_name TEXT,
  table_name TEXT,
  trigger TEXT,
  data_old JSONB,
  data_new JSONB,
  processed BOOLEAN DEFAULT FALSE
);

-- Create function to store data from trigger in table
DROP FUNCTION IF EXISTS invopro.persist_trigger() CASCADE;
CREATE FUNCTION invopro.persist_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value JSONB;
  payload_items_new JSONB;
  payload_items_old JSONB;
  event_name TEXT;
  fields TEXT[];
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;

  event_name = TG_ARGV[0];
  fields = TG_ARGV[1];

  -- Get old values of required fields
  FOREACH column_name IN ARRAY fields LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING OLD;
    payload_items_old := coalesce(payload_items_old,'{}')::jsonb || column_value;
  END LOOP;

  -- Get new values of required fields
  FOREACH column_name IN ARRAY fields LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING NEW;
    payload_items_new := coalesce(payload_items_new,'{}')::jsonb || column_value;
  END LOOP;

  INSERT INTO invopro.trigger_data (
    timestamp,
    operation,
    schema_name,
    table_name,
    trigger,
    data_old,
    data_new
  )
  VALUES (
    CURRENT_TIMESTAMP,
    TG_OP,
    TG_TABLE_SCHEMA,
    TG_TABLE_NAME,
    event_name,
    payload_items_old,
    payload_items_new
  );

  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

create_triggers.sql:

CREATE TRIGGER move_notify_state_change
  AFTER UPDATE OF state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'state_change',
    '{id, move_type, state}'
  );

CREATE TRIGGER move_notify_edi_state_change
  AFTER UPDATE OF edi_state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'edi_state_change',
    '{id, move_type, edi_state}'
  );

CREATE TRIGGER move_notify_payment_state_change
  AFTER UPDATE OF payment_state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'payment_state_change',
    '{id, move_type, payment_state}'
  );

@deft01
Copy link

deft01 commented Jun 20, 2022

Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.

CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT $1.%I::TEXT', column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;```

thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment