Skip to content

Instantly share code, notes, and snippets.

@corporatepiyush
Forked from marcocitus/rollups.sql
Created November 14, 2019 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save corporatepiyush/73406d699afb6a553d64fe0486a114e6 to your computer and use it in GitHub Desktop.
Save corporatepiyush/73406d699afb6a553d64fe0486a114e6 to your computer and use it in GitHub Desktop.
Efficient real-time rollups with backfilling in Citus
CREATE TABLE rollups (
name text,
rolled_up_generation bigint default -1
);
-- Create a stub on workers to allow usage as a default in distributed tables
SELECT run_command_on_workers($$
CREATE OR REPLACE FUNCTION current_rollup_generation(rollup_name text)
RETURNS bigint LANGUAGE sql
AS $function$
SELECT 1::bigint
$function$
$$);
-- Get the current generation and take a share lock on it
CREATE OR REPLACE FUNCTION current_rollup_generation(rollup_name text)
RETURNS bigint STABLE
LANGUAGE plpgsql
AS $function$
DECLARE
current_value bigint := coalesce(pg_sequence_last_value(rollup_name), 0);
BEGIN
PERFORM pg_advisory_xact_lock_shared(current_value);
RETURN current_value;
END;
$function$;
-- Find a safe range of generations to roll up
CREATE OR REPLACE FUNCTION safe_rollup_window(rollup_name text, OUT start_generation bigint, OUT end_generation bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
BEGIN
-- Use SELECT for UPDATE to prevent concurrent rollups
SELECT rolled_up_generation + 1 INTO start_generation
FROM rollups WHERE name = rollup_name FOR UPDATE;
-- Start a new generation
end_generation := nextval(rollup_name) - 1;
-- Block until all data from past generations has been ingested
PERFORM pg_advisory_xact_lock(generation)
FROM generate_series(start_generation, end_generation) generation;
-- We expect to be rolling up to end_generation
UPDATE rollups SET rolled_up_generation = end_generation WHERE name = rollup_name;
END;
$function$;
CREATE TABLE data (
tenant_id int,
event text,
time timestamptz default now(),
rollup_generation bigint default current_rollup_generation('my_rollup')
);
CREATE INDEX ON data USING BRIN (rollup_generation);
SELECT create_distributed_table('data', 'tenant_id');
CREATE TABLE daily_roll (
tenant_id int,
key text,
day date,
counter int,
primary key (tenant_id, key, day)
);
SELECT create_distributed_table('daily_roll', 'tenant_id');
INSERT INTO rollups VALUES ('my_rollup');
CREATE SEQUENCE my_rollup;
CREATE OR REPLACE FUNCTION do_rollup()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
start_gen bigint;
end_gen bigint;
BEGIN
SELECT start_generation, end_generation INTO start_gen, end_gen
FROM safe_rollup_window('my_rollup');
INSERT INTO daily_roll
SELECT tenant_id, event, time::date, count(*)
FROM data
WHERE rollup_generation BETWEEN start_gen AND end_gen
GROUP BY 1, 2, 3
ON CONFLICT (tenant_id, key, day)
DO UPDATE SET counter = daily_roll.counter + EXCLUDED.counter;
END;
$function$;
INSERT INTO data VALUES (1,'hello');
INSERT INTO data VALUES (1,'world');
INSERT INTO data VALUES (1,'hello');
SELECT do_rollup();
SELECT * FROM daily_roll;
INSERT INTO data VALUES (1,'hello');
SELECT do_rollup();
SELECT * FROM daily_roll;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment