Safe incremental rollups on Postgres and Citus
-- Create the raw events table | |
CREATE TABLE page_views ( | |
site_id int, | |
path text, | |
client_ip inet, | |
view_time timestamptz default now(), | |
view_id bigserial | |
); | |
-- Allow fast lookups of ranges of sequence IDs | |
CREATE INDEX view_id_idx ON page_views USING BRIN (view_id); | |
-- Citus only: distribute the table by site ID | |
SELECT create_distributed_table('page_views', 'site_id'); | |
-- Create the rollup table | |
CREATE TABLE page_views_1min ( | |
site_id int, | |
path text, | |
period_start timestamptz, | |
view_count bigint, | |
primary key (site_id, path, period_start) | |
); | |
-- Citus only: distribute the table by site ID | |
SELECT create_distributed_table('page_views_1min', 'site_id'); | |
-- Add our 1-minute rollup to the rollups table | |
INSERT INTO rollups (name, event_table_name, event_id_sequence_name) | |
VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq'); | |
-- Define the aggregation | |
CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint) | |
RETURNS record | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
/* determine which page views we can safely aggregate */ | |
SELECT window_start, window_end INTO start_id, end_id | |
FROM incremental_rollup_window('page_views_1min_rollup'); | |
/* exit early if there are no new page views to aggregate */ | |
IF start_id > end_id THEN RETURN; END IF; | |
/* aggregate the page views */ | |
INSERT INTO page_views_1min (site_id, path, period_start, view_count) | |
SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count | |
FROM page_views | |
WHERE view_id BETWEEN start_id AND end_id | |
GROUP BY site_id, path, date_trunc('minute', view_time) | |
ON CONFLICT (site_id, path, period_start) DO UPDATE | |
SET view_count = page_views_1min.view_count + EXCLUDED.view_count; | |
END; | |
$function$; | |
-- Run the aggregation | |
SELECT * FROM do_page_view_aggregation(); |
CREATE TABLE rollups ( | |
name text primary key, | |
event_table_name text not null, | |
event_id_sequence_name text not null, | |
last_aggregated_id bigint default 0 | |
); | |
CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint) | |
RETURNS record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
table_to_lock regclass; | |
BEGIN | |
/* | |
* Perform aggregation from the last aggregated ID + 1 up to the last committed ID. | |
* We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent | |
* aggregations from running concurrently. | |
*/ | |
SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name) | |
INTO table_to_lock, window_start, window_end | |
FROM rollups | |
WHERE name = rollup_name FOR UPDATE; | |
IF NOT FOUND THEN | |
RAISE 'rollup ''%'' is not in the rollups table', rollup_name; | |
END IF; | |
IF window_end IS NULL THEN | |
/* sequence was never used */ | |
window_end := 0; | |
RETURN; | |
END IF; | |
/* | |
* Play a little trick: We very briefly lock the table for writes in order to | |
* wait for all pending writes to finish. That way, we are sure that there are | |
* no more uncommitted writes with a identifier lower or equal to window_end. | |
* By throwing an exception, we release the lock immediately after obtaining it | |
* such that writes can resume. | |
*/ | |
BEGIN | |
EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock); | |
RAISE 'release table lock' USING ERRCODE = 'RLTBL'; | |
EXCEPTION WHEN SQLSTATE 'RLTBL' THEN | |
END; | |
/* | |
* Remember the end of the window to continue from there next time. | |
*/ | |
UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name; | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment