Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save begriffs/fa5615061afcd7284c7ed05d0df96da1 to your computer and use it in GitHub Desktop.
Save begriffs/fa5615061afcd7284c7ed05d0df96da1 to your computer and use it in GitHub Desktop.
Some Postgres audit logging customizations for use in PostgREST
-- Deploy myproject:application_function__benchmark to pg
-- requires: application__schema
BEGIN;
SET search_path TO application;
CREATE TYPE _bench_result AS (
function_name TEXT,
runtime REAL,
corrected REAL
);
CREATE FUNCTION _benchmark(
run_times INT,
funcs_to_test VARIADIC TEXT []
)
RETURNS SETOF _bench_result
AS $$
DECLARE
code TEXT := '';
rec _bench_result;
BEGIN
-- Start building the custom benchmarking function.
code := $_$
CREATE OR REPLACE FUNCTION _bench(run_times INTEGER)
RETURNS SETOF _bench_result AS $__$
DECLARE
s TIMESTAMP;
e TIMESTAMP;
rec RECORD;
d numeric;
res numeric;
ret _bench_result;
BEGIN
-- Create control.
s := timeofday();
FOR rec IN SELECT TRUE FROM generate_series( 1, $_$ || run_times || $_$ )
LOOP
END LOOP;
e := timeofday();
d := extract(epoch from e) - extract(epoch from s);
ret := ROW( '[Control]', d, 0 );
RETURN NEXT ret;
$_$;
-- Append the code to bench each function call.
FOR i IN array_lower(funcs_to_test, 1) .. array_upper(funcs_to_test, 1) LOOP
code := code || '
s := timeofday();
FOR rec IN SELECT ' || funcs_to_test [i] || ' FROM generate_series( 1, '
|| run_times || $__$ ) LOOP
END LOOP;
e := timeofday();
res := extract(epoch from e) - extract(epoch from s);
ret := ROW(
$__$ || quote_literal(funcs_to_test [i]) || $__$,
res,
res - d
);
RETURN NEXT ret;
$__$;
-- Deploy myproject:application_function_get_jwt_claims to pg
-- requires: application_type_jwt_claims
BEGIN;
SET search_path TO application;
CREATE FUNCTION get_jwt_claims()
RETURNS application.jwt_claims
AS $$
DECLARE
_iss TEXT;
_exp TIMESTAMPTZ;
_sub TEXT;
_rol TEXT;
_claims application.jwt_claims;
BEGIN
BEGIN
SELECT
current_setting('postgrest.claims.iss')
, current_setting('postgrest.claims.exp') :: TIMESTAMPTZ
, current_setting('postgrest.claims.sub')
, current_setting('postgrest.claims.rol')
INTO _iss, _exp, _sub, _rol;
EXCEPTION WHEN UNDEFINED_OBJECT THEN
-- Do nothing.
END;
_claims := ROW(_iss,_exp,_sub,_rol);
RETURN _claims;
END;
$$ LANGUAGE plpgsql;
GRANT EXECUTE ON FUNCTION get_jwt_claims() TO viewer;
COMMIT;
-- Deploy myproject:application_function_jsonb_minus to pg
-- requires: application__schema
BEGIN;
SET search_path TO application;
CREATE FUNCTION jsonb_minus(
arg1 JSONB,
arg2 JSONB
)
RETURNS JSONB
AS $$
SELECT
COALESCE(json_object_agg(
key,
CASE
-- if the value is an object and the value of the second argument is
-- not null, we do a recursion
WHEN jsonb_typeof(value) = 'object' AND arg2 -> key IS NOT NULL
THEN jsonb_minus(value, arg2 -> key)
-- for all the other types, we just return the value
ELSE value
END
), '{}') :: JSONB
FROM
jsonb_each(arg1)
WHERE
arg1 -> key <> arg2 -> key
OR arg2 -> key IS NULL
$$ LANGUAGE SQL IMMUTABLE STRICT;
GRANT EXECUTE ON FUNCTION jsonb_minus (JSONB, JSONB) TO public;
CREATE OPERATOR - (
PROCEDURE = jsonb_minus,
LEFTARG = JSONB,
RIGHTARG = JSONB
);
CREATE FUNCTION
jsonb_minus(
arg1 JSONB,
arg2 TEXT[]
)
RETURNS JSONB
AS $$
SELECT
COALESCE(jsonb_object_agg(key, value), '{}')
FROM jsonb_each(arg1)
WHERE key <> ALL (arg2)
$$ LANGUAGE SQL IMMUTABLE STRICT;
GRANT EXECUTE ON FUNCTION jsonb_minus (JSONB,TEXT[]) TO public;
CREATE OPERATOR - (
PROCEDURE = jsonb_minus,
LEFTARG = JSONB,
RIGHTARG = TEXT[]
);
COMMIT;
-- Deploy myproject:application_function_update_modified_column to pg
-- requires: application_schema
BEGIN;
SET search_path TO application;
CREATE FUNCTION update_modified_column()
RETURNS TRIGGER
AS $$
BEGIN
NEW.modified_at = now();
RETURN NEW;
END
$$ LANGUAGE plpgsql;
COMMIT;
-- Deploy myproject:application_table_users to pg
-- requires: application_function_encrypt_pass
BEGIN;
SET search_path TO application;
CREATE TYPE user_types AS ENUM ('local');
CREATE TYPE user_roles AS ENUM ('viewer');
CREATE TABLE users (
username TEXT PRIMARY KEY
CONSTRAINT username_size_50 CHECK (length(first_name) < 50),
email TEXT NOT NULL
CONSTRAINT email_format_valid CHECK (email ~* '^[A-Z0-9._%+-]+@(?:[A-Z0-9-]+\.)+[A-Z]{2,6}$'),
user_role user_roles NOT NULL,
user_refresh_exp_mins INT NOT NULL,
user_type user_types NOT NULL DEFAULT 'local',
password TEXT
CONSTRAINT password_size_50 CHECK (length(password) < 50),
auth_claims JSONB,
enabled BOOLEAN NOT NULL DEFAULT FALSE,
verified BOOLEAN NOT NULL DEFAULT FALSE,
CONSTRAINT user_type_and_assertion_match CHECK (
(user_type = 'local' AND password IS NOT NULL)
OR
(user_type != 'local' AND auth_claims IS NOT NULL))
);
CREATE TRIGGER encrypt_pass
BEFORE INSERT OR UPDATE ON application.users
FOR EACH ROW
EXECUTE PROCEDURE application.encrypt_pass();
SELECT audit.audit_table('application.users');
COMMIT;
-- Deploy myproject:application_type_jwt_claims to pg
-- requires: application__schema
BEGIN;
SET search_path TO application;
CREATE TYPE jwt_claims AS (iss TEXT, exp TIMESTAMPTZ, sub TEXT, rol NAME, role NAME);
COMMIT;
-- Deploy myproject:audit__schema to pg
BEGIN;
CREATE SCHEMA audit;
COMMENT ON SCHEMA audit IS 'Out-of-table audit/history logging tables and trigger functions';
REVOKE ALL ON SCHEMA audit FROM public;
GRANT USAGE ON SCHEMA audit TO viewer;
COMMIT;
-- Deploy myproject:audit_function_audit_table to pg
-- requires: audit_function_if_modified
BEGIN;
SET search_path TO audit;
CREATE FUNCTION audit_table(
table_to_audit NAME,
audit_rows BOOLEAN,
audit_query_text BOOLEAN,
ignored_cols TEXT []
)
RETURNS VOID
AS $$
DECLARE
stm_targets TEXT = 'INSERT OR UPDATE OR DELETE OR TRUNCATE';
_q_txt TEXT;
_ignored_cols_snip TEXT = '';
BEGIN
IF audit_rows
THEN
IF array_length(ignored_cols, 1) > 0
THEN
_ignored_cols_snip = ', ' || quote_literal(ignored_cols);
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
table_to_audit ||
' FOR EACH ROW EXECUTE PROCEDURE audit.if_modified(' ||
quote_literal(audit_query_text) || _ignored_cols_snip || ');';
EXECUTE _q_txt;
stm_targets = 'TRUNCATE';
ELSE
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' ||
table_to_audit ||
' FOR EACH STATEMENT EXECUTE PROCEDURE audit.if_modified(' ||
quote_literal(audit_query_text) || ');';
EXECUTE _q_txt;
END;
$$
LANGUAGE 'plpgsql';
COMMENT ON FUNCTION audit.audit_table(NAME, BOOLEAN, BOOLEAN, TEXT []) IS $$
Add auditing support to a table.
Arguments:
table_to_audit: Table name, schema qualified if not on search_path
audit_rows: Record each row change, or only audit at a statement level
audit_query_text: Record the text of the client query that triggered the audit event?
ignored_cols: Columns to exclude from update diffs, ignore updates that change only ignored cols.
$$;
-- Pg doesn't allow variadic calls with 0 params, so provide a wrapper
CREATE OR REPLACE FUNCTION audit.audit_table(table_to_audit NAME, audit_rows BOOLEAN, audit_query_text BOOLEAN)
RETURNS VOID
AS $$
SELECT audit.audit_table($1, $2, $3, ARRAY [] :: TEXT []);
$$ LANGUAGE SQL;
-- And provide a convenience call wrapper for the simplest case
-- of row-level logging with no excluded cols and query logging enabled.
--
CREATE OR REPLACE FUNCTION audit.audit_table(table_to_audit NAME)
RETURNS VOID
AS $$
SELECT audit.audit_table($1, BOOLEAN 't', BOOLEAN 't');
$$ LANGUAGE 'sql';
COMMENT ON FUNCTION audit.audit_table(NAME) IS $$
Add auditing support to the given table. Row-level changes will be logged with full client query text. No cols are ignored.
Arguments:
table_to_audit: Table name, schema qualified if not on search_path
$$;
COMMIT;
-- Deploy myproject:audit_function_if_modified to pg
-- requires: application_function_get_jwt_claims
-- requires: audit__schema
-- requires: application_function_jsonb_minus
BEGIN;
SET search_path TO audit,application;
CREATE FUNCTION if_modified()
RETURNS TRIGGER
AS $$
DECLARE
audit_row audit.audit_log;
claims application.jwt_claims;
include_values BOOLEAN;
log_diffs BOOLEAN;
h_old JSONB;
h_new JSONB;
excluded_cols TEXT [] = ARRAY [] :: TEXT [];
BEGIN
IF tg_when <> 'AFTER'
THEN
RAISE EXCEPTION 'audit.if_modified() may only run as an AFTER trigger';
END IF;
claims = application.get_jwt_claims();
audit_row = ROW (
nextval('audit.audit_log_event_id_seq')
, tg_table_schema :: TEXT
, tg_table_name :: TEXT
, tg_relid
, session_user :: TEXT
, claims.sub
, claims.rol
, current_timestamp
, statement_timestamp()
, clock_timestamp()
, txid_current()
, current_query()
, tg_op
, '{}'
, '{}'
, 'f'
);
IF NOT tg_argv [0] :: BOOLEAN IS DISTINCT FROM 'f' :: BOOLEAN
THEN
audit_row.client_query = NULL;
END IF;
IF tg_argv [1] IS NOT NULL
THEN
excluded_cols = tg_argv [1] :: TEXT [];
END IF;
IF (tg_op = 'UPDATE' AND tg_level = 'ROW')
THEN
audit_row.row_data = to_jsonb(OLD) OPERATOR(application.-) excluded_cols;
audit_row.changed_fields = (to_jsonb(NEW) OPERATOR(application.-) audit_row.row_data) OPERATOR(application.-) excluded_cols;
IF audit_row.changed_fields IS NULL
THEN
-- All changed fields are ignored. Skip this update.
RETURN NULL;
END IF;
ELSIF (tg_op = 'DELETE' AND tg_level = 'ROW')
THEN
audit_row.row_data = to_jsonb(OLD) OPERATOR(application.-) excluded_cols;
ELSIF (tg_op = 'INSERT' AND tg_level = 'ROW')
THEN
audit_row.row_data = to_jsonb(NEW) OPERATOR(application.-) excluded_cols;
ELSIF (tg_level = 'STATEMENT' AND tg_op IN ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE'))
THEN
audit_row.stmt_only = 't';
ELSE
RAISE EXCEPTION '[audit.if_modified] - Trigger func added as trigger for unhandled case: %, %', tg_op, tg_level;
RETURN NULL;
END IF;
INSERT INTO audit.audit_log VALUES (audit_row.*);
RETURN NULL;
END;
$$ LANGUAGE plpgsql
SECURITY DEFINER;
COMMENT ON FUNCTION audit.if_modified() IS $$
Track changes to a table at the statement and/or row level.
Optional parameters to trigger in CREATE TRIGGER call:
param 0: boolean, whether to log the query text. Default 't'.
param 1: text[], columns to ignore in updates. Default [].
Updates to ignored cols are omitted from changed_fields.
Updates with only ignored cols changed are not inserted
into the audit log.
Almost all the processing work is still done for updates
that ignored. If you need to save the load, you need to use
WHEN clause on the trigger instead.
No warning or error is issued if ignored_cols contains columns
that do not exist in the target table. This lets you specify
a standard set of ignored columns.
There is no parameter to disable logging of values. Add this trigger as
a 'FOR EACH STATEMENT' rather than 'FOR EACH ROW' trigger if you do not
want to log row values.
Note that the user name logged is the login role for the session. The audit trigger
cannot obtain the active role because it is reset by the SECURITY DEFINER invocation
of the audit trigger its self.
$$;
COMMIT;
-- Deploy myproject:audit_table_audit_log to pg
-- requires: audit__schema
BEGIN;
SET search_path TO audit;
CREATE TYPE audit_actions AS ENUM ('INSERT','DELETE','UPDATE','TRUNCATE');
CREATE TABLE audit_log (
event_id BIGSERIAL NOT NULL,
schema_name TEXT NOT NULL,
table_name TEXT NOT NULL,
relid OID NOT NULL,
pg_session_user NAME NOT NULL,
username TEXT,
user_role TEXT,
action_tx_ts TIMESTAMPTZ NOT NULL,
action_stm_ts TIMESTAMPTZ NOT NULL,
action_trig_ts TIMESTAMPTZ NOT NULL,
tx_id BIGINT NOT NULL,
client_query TEXT,
action audit_actions NOT NULL,
row_data JSONB NOT NULL,
changed_fields JSONB NOT NULL,
stmt_only BOOLEAN NOT NULL
);
CREATE INDEX audit_log_relid_idx ON audit.audit_log(relid);
CREATE INDEX audit_log_username_idx ON audit.audit_log(username);
CREATE INDEX audit_log_action_stm_ts_idx ON audit.audit_log(action_stm_ts);
CREATE INDEX audit_log_action_idx ON audit.audit_log(action);
COMMENT ON TABLE audit.audit_log IS 'History of auditable actions on audited tables, from audit.if_modified()';
COMMENT ON COLUMN audit.audit_log.event_id IS 'Unique identifier for each auditable event';
COMMENT ON COLUMN audit.audit_log.schema_name IS 'Database schema audited table for this event is in';
COMMENT ON COLUMN audit.audit_log.table_name IS 'Non-schema-qualified table name of table event occurred in';
COMMENT ON COLUMN audit.audit_log.relid IS '(INDEXED) Table OID. Changes with drop/create. Get with ''tablename''::regclass';
COMMENT ON COLUMN audit.audit_log.pg_session_user IS 'Session user whose statement caused the audited event';
COMMENT ON COLUMN audit.audit_log.username IS '(INDEXED) Login user whose statement caused the audited event';
COMMENT ON COLUMN audit.audit_log.user_role IS 'Role of user whose statement caused the audited event';
COMMENT ON COLUMN audit.audit_log.action_tx_ts IS 'Transaction start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN audit.audit_log.action_stm_ts IS '(INDEXED) Statement start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN audit.audit_log.action_trig_ts IS 'Wall clock time at which audited event''s trigger call occurred';
COMMENT ON COLUMN audit.audit_log.tx_id IS 'Identifier of transaction that made the change. May wrap, but unique paired with action_tx_ts.';
COMMENT ON COLUMN audit.audit_log.client_query IS 'Top-level query that caused this auditable event. May be more than one statement.';
COMMENT ON COLUMN audit.audit_log.action IS '(INDEXED) Action type';
COMMENT ON COLUMN audit.audit_log.row_data IS 'Record value. Null for statement-level trigger. For INSERT this is the new tuple. For DELETE and UPDATE it is the old tuple.';
COMMENT ON COLUMN audit.audit_log.changed_fields IS 'New values of fields changed by UPDATE. Null except for row-level UPDATE events.';
COMMENT ON COLUMN audit.audit_log.stmt_only IS 'TRUE if audit event is from an FOR EACH STATEMENT trigger, FALSE for FOR EACH ROW';
REVOKE ALL ON TABLE audit_log FROM public;
GRANT SELECT ON TABLE audit_log TO viewer;
COMMIT;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment