Skip to content

Instantly share code, notes, and snippets.

@corporatepiyush
Forked from marcocitus/example.sql
Created November 7, 2019 13:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save corporatepiyush/18629345befa31e92ef74b0b35b98f80 to your computer and use it in GitHub Desktop.
Save corporatepiyush/18629345befa31e92ef74b0b35b98f80 to your computer and use it in GitHub Desktop.
Safe incremental rollups on Postgres and Citus
-- Create the raw events table
CREATE TABLE page_views (
site_id int,
path text,
client_ip inet,
view_time timestamptz default now(),
view_id bigserial
);
-- Allow fast lookups of ranges of sequence IDs
CREATE INDEX view_id_idx ON page_views USING BRIN (view_id);
-- Citus only: distribute the table by site ID
SELECT create_distributed_table('page_views', 'site_id');
-- Create the rollup table
CREATE TABLE page_views_1min (
site_id int,
path text,
period_start timestamptz,
view_count bigint,
primary key (site_id, path, period_start)
);
-- Citus only: distribute the table by site ID
SELECT create_distributed_table('page_views_1min', 'site_id');
-- Add our 1-minute rollup to the rollups table
INSERT INTO rollups (name, event_table_name, event_id_sequence_name)
VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq');
-- Define the aggregation
CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
BEGIN
/* determine which page views we can safely aggregate */
SELECT window_start, window_end INTO start_id, end_id
FROM incremental_rollup_window('page_views_1min_rollup');
/* exit early if there are no new page views to aggregate */
IF start_id > end_id THEN RETURN; END IF;
/* aggregate the page views */
INSERT INTO page_views_1min (site_id, path, period_start, view_count)
SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count
FROM page_views
WHERE view_id BETWEEN start_id AND end_id
GROUP BY site_id, path, date_trunc('minute', view_time)
ON CONFLICT (site_id, path, period_start) DO UPDATE
SET view_count = page_views_1min.view_count + EXCLUDED.view_count;
END;
$function$;
-- Run the aggregation
SELECT * FROM do_page_view_aggregation();
CREATE TABLE rollups (
name text primary key,
event_table_name text not null,
event_id_sequence_name text not null,
last_aggregated_id bigint default 0
);
CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
DECLARE
table_to_lock regclass;
BEGIN
/*
* Perform aggregation from the last aggregated ID + 1 up to the last committed ID.
* We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent
* aggregations from running concurrently.
*/
SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
INTO table_to_lock, window_start, window_end
FROM rollups
WHERE name = rollup_name FOR UPDATE;
IF NOT FOUND THEN
RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
END IF;
IF window_end IS NULL THEN
/* sequence was never used */
window_end := 0;
RETURN;
END IF;
/*
* Play a little trick: We very briefly lock the table for writes in order to
* wait for all pending writes to finish. That way, we are sure that there are
* no more uncommitted writes with a identifier lower or equal to window_end.
* By throwing an exception, we release the lock immediately after obtaining it
* such that writes can resume.
*/
BEGIN
EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock);
RAISE 'release table lock' USING ERRCODE = 'RLTBL';
EXCEPTION WHEN SQLSTATE 'RLTBL' THEN
END;
/*
* Remember the end of the window to continue from there next time.
*/
UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;
END;
$function$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment