Skip to content

Instantly share code, notes, and snippets.

@colophonemes
Last active February 17, 2024 15:15
Show Gist options
  • Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'email',
'username'
);
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'person_id',
'amount',
'currency_code',
'start_date',
'end_date',
'data_source'
);
const pg = require('pg')
var pgConString = process.env.DATABASE_URL
// Connect to the DB
pg.connect(pgConString, function (err, client) {
if (err) {
console.error(err)
}
// Handle notifications
client.on('notification', function (msg) {
const payload = msg.payload
console.log(payload)
// Send payload into a queue etc...
})
// Listen for NOTIFY calls
var query = client.query('LISTEN db_notifications')
})
-- Trigger notification for messaging to PG Notify
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value TEXT;
payload_items TEXT[];
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT $1.%I::TEXT', column_name)
INTO column_value
USING rec;
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"');
END LOOP;
-- Build the payload
payload := ''
|| '{'
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",'
|| '"operation":"' || TG_OP || '",'
|| '"schema":"' || TG_TABLE_SCHEMA || '",'
|| '"table":"' || TG_TABLE_NAME || '",'
|| '"data":{' || array_to_string(payload_items, ',') || '}'
|| '}';
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
@colophonemes
Copy link
Author

This TRIGGER function calls PosgreSQL's NOTIFY command with a JSON payload. You can listen for these calls and then send the JSON payload to a message queue (like AMQP/RabbitMQ) or trigger other actions.

Create the trigger with notify_trigger.sql.

When declaring the trigger, supply the column names you want the JSON payload to contain as arguments to the function (see create_triggers.sql)

The payload returns a JSON object:

{
  "timestamp": "2017-01-14 22:10:49.506002+00",
  "operation": "INSERT",
  "schema": "pledges",
  "table": "income",
  "data": {
    "id": "4e565844-daa6-11e6-ad5e-4b33ec44da97",
    "person_id": "8dba0c26-da13-11e6-b5bc-474d83f61aaa",
    "amount": "12345.99",
    "currency_code": "USD",
    "start_date": "2016-01-01",
    "end_date": "2016-12-31"
  }
}

Inspired by this post by Björn Gylling (@bjorngylling).

@sidmutha
Copy link

sidmutha commented May 3, 2018

Hi
Just wanted to say that we packaged this idea (albeit written in C) into a Docker container that can simply be dropped alongside your Postgres instance:
https://github.com/hasura/skor
What do you think?

@shrumm
Copy link

shrumm commented Oct 28, 2018

Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.

CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT $1.%I::TEXT', column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;```

@SelmanKahya
Copy link

that helped, thanks bud

@domq
Copy link

domq commented Dec 1, 2019

Thanks for this code, which was very helpful! These days, you can simplify your code thanks to the json_object function which just so happens to turn an even-sized PostgreSQL ARRAY into a JSON object by alternating keys and values. Also, you can use pg-listen instead of pg. I demonstrated both techniques in a forked gist.

@marcopeg
Copy link

marcopeg commented Dec 2, 2019

Hello everyone, I am trying to get this trigger to work on the following table schema, but somehow the loop triggers an error:

CREATE TABLE IF NOT EXISTS fetchq_catalog.fetchq_sys_queues (
        id SERIAL PRIMARY KEY,
        created_at TIMESTAMP WITH TIME ZONE,
        name CHARACTER VARYING(40) NOT NULL,
        is_active BOOLEAN DEFAULT true,
        current_version INTEGER DEFAULT 0,
        max_attempts INTEGER DEFAULT 5,
        errors_retention VARCHAR(25) DEFAULT '24h',
        metrics_retention JSONB DEFAULT '[]',
        config JSON DEFAULT '{}'
    );

Can you help to identify what's wrong?

@marcopeg
Copy link

marcopeg commented Dec 2, 2019

Hello again, after a few dead ends I found this solution that seems to be working just fine:

SELECT json_agg(n)::text INTO data FROM json_each_text(to_json(rec)) n;

@whomakemecrazy
Copy link

whomakemecrazy commented Aug 8, 2020

Hello, How about this
I'm add new function is_json

create or replace function is_json(text)
returns boolean language plpgsql immutable as $$
begin
    perform $1::json;
    return true;
exception
    when invalid_text_representation then
        return false;
end $$;

then

   CASE
        WHEN is_json(column_value) THEN
            payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value::jsonb)::jsonb;
        ELSE 
            payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
    END CASE;

@bsingr
Copy link

bsingr commented Sep 9, 2020

Doing something similar, but instead of coalesce i am collecting arrays of keys and values and use json_object instead of json_build_object.

Also, instead of CREATE OR REPLACE function i am using DROP IF EXISTS CASCADE before. That way all table triggers are deleted (cleaned) before on each start of node process.

Also, for actuall listening i am using this library: https://github.com/andywer/pg-listen

Not sure if thats better. Just found this and wanted to share my approach.

Trigger function:

  DROP FUNCTION IF EXISTS ${schemaName}.${functionName}() CASCADE;
  CREATE FUNCTION ${schemaName}.${functionName}() RETURNS trigger AS $$
  DECLARE
    notification json;
    new_or_changed_record_as_jsonb jsonb;
    channelName text;
    fields text[];
    field text;
    extracted_fields text[];
    extracted_values text[];
  BEGIN
    -- TG_ARGV[0]: 1st trigger argument must be channel name
    channelName = TG_ARGV[0];

    -- TG_ARGV[1]: 2nd trigger argument must be list of fields to include in JSON
    fields = TG_ARGV[1];

    -- create temp. jsonb version of changed or created record 1
    IF (TG_OP = 'DELETE') THEN
      new_or_changed_record_as_jsonb = to_jsonb(OLD);
    ELSE
      new_or_changed_record_as_jsonb = to_jsonb(NEW);
    END IF;

    -- build json dynamically for a given list of fields
    FOREACH field IN ARRAY fields LOOP
      extracted_fields = array_append(extracted_fields, field);
      extracted_values = array_append(extracted_values, new_or_changed_record_as_jsonb->>field);
    END LOOP;

    notification = json_build_object(
      'type', TG_OP,
      'dateTime', clock_timestamp(),
      'table', TG_TABLE_NAME::text,
      'schema', TG_TABLE_SCHEMA::text,
      'data', json_object(extracted_fields, extracted_values)
    );

    PERFORM pg_notify(channelName, notification::text);
    RETURN NULL;
  END;
  $$ LANGUAGE plpgsql;

@mStirner
Copy link

mStirner commented Sep 10, 2020

I have tried to build this setup, but get the error:

ERROR: operator does not exist: jsonb || jsonb at character 44
HINT: No operator matches the given name and argument type(s). You might need to add explicit type casts.
QUERY: SELECT coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb
CONTEXT: PL/pgSQL function notify_trigger() line 24 at assignment

my notify_trigger function looks like this (from @shrumm):

CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT $1.%I::TEXT', column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

Can any one help me ?

PostgreSQL version is 9.4.24.

@colophonemes
Copy link
Author

@mStirner I think it's because you're trying to coalesce a JSONB type with a JSON type. Maybe try using jsonb_build_object without the cast instead of json_build_object()::JSONB?

@mStirner
Copy link

@mStirner I think it's because you're trying to coalesce a JSONB type with a JSON type. Maybe try using jsonb_build_object without the cast instead of json_build_object()::JSONB?

I have no idea from sql (postgresql)... I have opend a qustion on StackExcahnge (https://dba.stackexchange.com/questions/275297/postgresql-enable-notification-via-trigger-function?noredirect=1#comment539912_275297) some one there writed:

The || operator for jsonb was introduced in 9.5 and is not available in your un-supported version

But i have no idea how to solve this...

@raulsaeztapia
Copy link

First of all, thanks for writing this code.

In order to have dynamic types into column_value, instead all values as String, I have changed it as:

DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value JSONB;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || column_value;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

As result we get:
{"data": {"id": 1111, "hash": 2222, "type": "LIBRARY", "score": 3.03, "viewed": false, "version": 0, "state_id": 1, "lastenv_id": 3, "environment": "SERVER", "firstenv_id": 3, "occurrences": 2, "application_id": 3333, "lastocc urrence": "2021-08-25T16:43:54.376+00:00", "firstoccurrence": "2021-08-25T16:43:54.376+00:00", "organization_id": 4444, "processingstate": "CREATED", "statechangetime": null, "lastappversion_id": 5555, "firstappversion_id": 6666}, "table": "table_things", "schema": "public", "operation": "UPDATE", "timestamp": "2021-09-01T17:06:59.447523+00:00"}

@sguldemond
Copy link

Thanks for this! I was looking for a design, where I can listen to changes in the database and act on them from a Python application I'm writing. Only after some research I noticed that the NOTIFY/LISTEN design is not great for this purpose. Because when there are no listeners, all payloads from NOTIFY are instantly lost, and I need to make sure I act on all relevant changes. This scenario is also discussed here: https://stackoverflow.com/questions/23087347/what-happens-with-a-notify-when-no-session-has-issued-listen-in-postgresql

I repurposed most of the code to serve my needs and write trigger data to a new database. Also I'm persisting both the OLD and NEW and an extra string to identify the trigger.

persist_trigger.sql:

-- Create table to store trigger data in
CREATE TABLE invopro.trigger_data(
  id SERIAL PRIMARY KEY,
  timestamp TIMESTAMP,
  operation TEXT,
  schema_name TEXT,
  table_name TEXT,
  trigger TEXT,
  data_old JSONB,
  data_new JSONB,
  processed BOOLEAN DEFAULT FALSE
);

-- Create function to store data from trigger in table
DROP FUNCTION IF EXISTS invopro.persist_trigger() CASCADE;
CREATE FUNCTION invopro.persist_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value JSONB;
  payload_items_new JSONB;
  payload_items_old JSONB;
  event_name TEXT;
  fields TEXT[];
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;

  event_name = TG_ARGV[0];
  fields = TG_ARGV[1];

  -- Get old values of required fields
  FOREACH column_name IN ARRAY fields LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING OLD;
    payload_items_old := coalesce(payload_items_old,'{}')::jsonb || column_value;
  END LOOP;

  -- Get new values of required fields
  FOREACH column_name IN ARRAY fields LOOP
    EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
    INTO column_value
    USING NEW;
    payload_items_new := coalesce(payload_items_new,'{}')::jsonb || column_value;
  END LOOP;

  INSERT INTO invopro.trigger_data (
    timestamp,
    operation,
    schema_name,
    table_name,
    trigger,
    data_old,
    data_new
  )
  VALUES (
    CURRENT_TIMESTAMP,
    TG_OP,
    TG_TABLE_SCHEMA,
    TG_TABLE_NAME,
    event_name,
    payload_items_old,
    payload_items_new
  );

  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

create_triggers.sql:

CREATE TRIGGER move_notify_state_change
  AFTER UPDATE OF state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'state_change',
    '{id, move_type, state}'
  );

CREATE TRIGGER move_notify_edi_state_change
  AFTER UPDATE OF edi_state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'edi_state_change',
    '{id, move_type, edi_state}'
  );

CREATE TRIGGER move_notify_payment_state_change
  AFTER UPDATE OF payment_state
  ON public.account_move
  FOR EACH ROW
  EXECUTE PROCEDURE invopro.persist_trigger(
    'payment_state_change',
    '{id, move_type, payment_state}'
  );

@deft01
Copy link

deft01 commented Jun 20, 2022

Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.

CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items JSONB;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT', 'UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Get required fields
  FOREACH column_name IN ARRAY TG_ARGV LOOP
    EXECUTE format('SELECT $1.%I::TEXT', column_name)
    INTO column_value
    USING rec;
    payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
  END LOOP;

  -- Build the payload
  payload := json_build_object(
    'timestamp',CURRENT_TIMESTAMP,
    'operation',TG_OP,
    'schema',TG_TABLE_SCHEMA,
    'table',TG_TABLE_NAME,
    'data',payload_items
  );

  -- Notify the channel
  PERFORM pg_notify('db_notifications', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;```

thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment