Skip to content

Instantly share code, notes, and snippets.

@ismasan
Created September 15, 2023 10:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ismasan/5f9c4956333dd72a75a256d1f1a19e1e to your computer and use it in GitHub Desktop.
Save ismasan/5f9c4956333dd72a75a256d1f1a19e1e to your computer and use it in GitHub Desktop.
A PG function to rebalance consumers in a group
CREATE OR REPLACE FUNCTION public.rebalance_consumers_all_groups()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
group_ids uuid[];
min_last_seq bigint;
consumers_deleted bigint;
group_min_seq bigint;
p_group_id uuid;
BEGIN
-- Step 1: Get distinct group names
SELECT ARRAY(SELECT DISTINCT id FROM consumer_groups) INTO group_ids;
-- Step 2: Loop through each group name
FOREACH p_group_id IN ARRAY group_ids
LOOP
consumers_deleted := 0; -- Initialize a flag to track if consumers were deleted
group_min_seq := 0;
-- Compute the minimum last_seq for the current group_name
SELECT MIN(last_seq) INTO min_last_seq FROM consumers WHERE group_id = p_group_id;
-- Update group.last seq to min seq
IF min_last_seq IS NOT NULL THEN
UPDATE consumer_groups
SET last_seq = min_last_seq, rebalanced_at = current_timestamp AT TIME ZONE 'UTC'
WHERE id = p_group_id;
END IF;
-- Delete consumer records with live_until older than the current time for the current group_name
DELETE FROM consumers
WHERE group_id = p_group_id AND live_until < current_timestamp AT TIME ZONE 'UTC';
-- Check if any consumers were deleted
GET DIAGNOSTICS consumers_deleted = ROW_COUNT;
-- If consumers were deleted, update remaining records for the current group_name with the computed min_last_seq
IF consumers_deleted > 0 THEN
UPDATE consumers
SET last_seq = min_last_seq
WHERE group_id = p_group_id;
END IF;
END LOOP;
END;
$function$
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment