Skip to content

Instantly share code, notes, and snippets.

@cbrianpace
Last active February 24, 2025 14:37
Show Gist options
  • Select an option

  • Save cbrianpace/55e454f04b0c4feb799aef30f1b416c1 to your computer and use it in GitHub Desktop.

Select an option

Save cbrianpace/55e454f04b0c4feb799aef30f1b416c1 to your computer and use it in GitHub Desktop.
Postgres Parallel Query Statement Queuing
/**
* MIT License
*
* Copyright (c) 2024-2025 Brian Pace
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
CREATE OR REPLACE FUNCTION public.query_large_table_func(filter_value numeric)
RETURNS TABLE(id integer, value double precision)
LANGUAGE plpgsql
AS $function$
DECLARE
max_workers numeric := current_setting('max_parallel_workers')::numeric;
running_workers numeric;
pct_avail int;
trash text;
attempt int := 0;
BEGIN
-- Fetch the initial count of running parallel workers
SELECT COUNT(*)::numeric INTO running_workers
FROM pg_stat_activity
WHERE backend_type = 'parallel worker';
-- Calculate initial percentage of available workers
pct_avail := round(((max_workers - running_workers) / max_workers) * 100);
-- Check for availability and loop if below 50% availability, with a limit on attempts
WHILE pct_avail < 50 AND attempt <= 10 LOOP
RAISE LOG 'Waiting for available parallel processes... Max workers: %, Available workers: %',
max_workers, max_workers - running_workers;
-- Sleep for 2 seconds and re-check
PERFORM pg_sleep(2);
attempt := attempt + 1;
-- Recalculate the number of running workers and availability percentage
SELECT COUNT(*)::numeric INTO running_workers
FROM pg_stat_activity
WHERE backend_type = 'parallel worker';
pct_avail := round(((max_workers - running_workers) / max_workers) * 100);
END LOOP;
-- Error if Max Attempt was reached
IF attempt > 10
THEN
RAISE EXCEPTION 'Insufficient parallel workers: Max: %, Running: %, Available: %',
max_workers, running_workers, max_workers - running_workers
USING ERRCODE = '57014',
HINT = 'Try again later when more parallel workers are available.';
END IF;
-- Execute the query with parallelism
RETURN QUERY
SELECT lt.id, lt.value
FROM large_table lt
WHERE lt.value > filter_value
ORDER BY lt.id DESC;
END;
$function$
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment