Skip to content

Instantly share code, notes, and snippets.

@molind
Created July 30, 2021 09:34
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save molind/ab52c5540cd33b54b1249b3484b1ca38 to your computer and use it in GitHub Desktop.
Save molind/ab52c5540cd33b54b1249b3484b1ca38 to your computer and use it in GitHub Desktop.
Parallel select and Parallel query
-- since it uses dblink it should be enabled in the database.
-- CREATE EXTENSION dblink;
-- And you'll may need to grant permissions to use it to your user.
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text) TO user;
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO user;
-- Usage example:
-- select g_parsel('insert into osm_polygon_extra select osm_id, st_pointonsurface( st_collect( geom ) ) from osm_polygons group by osm_id;', 'osm_polygons', 12);
CREATE OR REPLACE FUNCTION public.g_parsel(query text, table_to_chunk text, num_chunks integer DEFAULT 2)
RETURNS text
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
sql TEXT;
chunk_table text;
subquery text;
i integer;
conn text;
n integer;
num_done integer;
status integer;
dispatch_result integer;
dispatch_error text;
part text;
rand text;
BEGIN
-- loop through chunks
for i in
select generate_series(1,num_chunks)
LOOP
--for debugging
RAISE NOTICE 'Chunk %', i;
--make a new db connection
conn := 'conn_' || i;
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');';
execute sql;
--part := '(SELECT * FROM ' || table_to_chunk || ' WHERE osm_id % ' || num_chunks || ' = ' || i-1 || ') p1';
part := table_to_chunk || ' WHERE abs(osm_id % ' || num_chunks || ') = ' || i-1 || ' ';
--edit the input query using the subsquery string
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');';
execute sql into subquery;
RAISE NOTICE '%', subquery;
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');';
execute sql into dispatch_result;
end loop;
-- wait until all queries are finished
Loop
num_done := 0;
for i in 1..num_chunks
Loop
conn := 'conn_' || i;
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');';
execute sql into status;
if status = 0 THEN
-- check for error messages
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');';
execute sql into dispatch_error;
if dispatch_error <> 'OK' THEN
RAISE '%', dispatch_error;
end if;
num_done := num_done + 1;
END if;
end loop;
sql := 'select pg_sleep(1);';
execute sql;
if num_done >= num_chunks then
exit;
end if;
END loop;
-- disconnect the dblinks
FOR i in 1..num_chunks
LOOP
conn := 'conn_' || i;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
end loop;
RETURN 'Success';
-- error catching to disconnect dblink connections, if error occurs
exception when others then
BEGIN
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
for n in
SELECT generate_series(1,i) as n
LOOP
conn := 'conn_' || n;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
END LOOP;
exception when others then
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
end;
END
$function$
;
-- Parallel query AKA pquery
-- It runs multiple queries in parallel and exits when everything is done.
-- Usage example
-- select public.g_pquery(ARRAY[
-- 'CREATE INDEX "osm_polygons_extra_polyZoom" ON osm_polygons_extra USING brin(polyZoom);',
-- 'CREATE INDEX "osm_polygons_extra_centerPoint" ON osm_polygons_extra USING gist(centerPoint);'
-- ]);
CREATE OR REPLACE FUNCTION public.g_pquery(queries text[])
RETURNS void
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
sql text;
subquery text;
conn text;
i integer;
n integer;
num_done integer;
status integer;
dispatch_result integer;
dispatch_error text;
part text;
rand text;
BEGIN
i := 1;
FOREACH subquery IN ARRAY queries
loop
--for debugging
RAISE NOTICE 'Query %', i;
--make a new db connection
conn := 'pq_' || i;
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');';
execute sql;
RAISE NOTICE '%', subquery;
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');';
execute sql into dispatch_result;
i := i+1;
end loop;
n := i-1;
-- wait until all queries are finished
Loop
num_done := 0;
for i in 1..n
Loop
conn := 'pq_' || i;
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');';
execute sql into status;
if status = 0 THEN
-- check for error messages
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');';
execute sql into dispatch_error;
if dispatch_error <> 'OK' THEN
RAISE '%', dispatch_error;
end if;
num_done := num_done + 1;
END if;
end loop;
sql := 'select pg_sleep(1);';
execute sql;
if num_done >= n then
exit;
end if;
END loop;
-- disconnect the dblinks
FOR i in 1..n
LOOP
conn := 'pq_' || i;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
end loop;
END
$function$
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment