Created
September 15, 2023 10:22
-
-
Save ismasan/5f9c4956333dd72a75a256d1f1a19e1e to your computer and use it in GitHub Desktop.
A PG function to rebalance consumers in a group
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION public.rebalance_consumers_all_groups() | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
group_ids uuid[]; | |
min_last_seq bigint; | |
consumers_deleted bigint; | |
group_min_seq bigint; | |
p_group_id uuid; | |
BEGIN | |
-- Step 1: Get distinct group names | |
SELECT ARRAY(SELECT DISTINCT id FROM consumer_groups) INTO group_ids; | |
-- Step 2: Loop through each group name | |
FOREACH p_group_id IN ARRAY group_ids | |
LOOP | |
consumers_deleted := 0; -- Initialize a flag to track if consumers were deleted | |
group_min_seq := 0; | |
-- Compute the minimum last_seq for the current group_name | |
SELECT MIN(last_seq) INTO min_last_seq FROM consumers WHERE group_id = p_group_id; | |
-- Update group.last seq to min seq | |
IF min_last_seq IS NOT NULL THEN | |
UPDATE consumer_groups | |
SET last_seq = min_last_seq, rebalanced_at = current_timestamp AT TIME ZONE 'UTC' | |
WHERE id = p_group_id; | |
END IF; | |
-- Delete consumer records with live_until older than the current time for the current group_name | |
DELETE FROM consumers | |
WHERE group_id = p_group_id AND live_until < current_timestamp AT TIME ZONE 'UTC'; | |
-- Check if any consumers were deleted | |
GET DIAGNOSTICS consumers_deleted = ROW_COUNT; | |
-- If consumers were deleted, update remaining records for the current group_name with the computed min_last_seq | |
IF consumers_deleted > 0 THEN | |
UPDATE consumers | |
SET last_seq = min_last_seq | |
WHERE group_id = p_group_id; | |
END IF; | |
END LOOP; | |
END; | |
$function$ | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment