Skip to content

Instantly share code, notes, and snippets.

@cabecada
Forked from molind/parsel.sql
Created December 23, 2023 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cabecada/ecc836858b7fa0e099d1ed946aff9f51 to your computer and use it in GitHub Desktop.
Save cabecada/ecc836858b7fa0e099d1ed946aff9f51 to your computer and use it in GitHub Desktop.
Parallel select function for PostgreSQL.
--
-- Befor using it you should enable dblink extension in database and allow user to run dblink_connect_u
-- You may need to change 'dbname=osm' to your db connection options in line 34.
-- CREATE EXTENSION dblink;
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text) TO user;
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO user;
--
DROP FUNCTION IF EXISTS public.g_parsel(query text, table_to_chunk text, num_chunks integer);
CREATE OR REPLACE FUNCTION public.g_parsel(query text, table_to_chunk text, num_chunks integer default 2)
RETURNS text AS
$BODY$
DECLARE
sql TEXT;
chunk_table text;
subquery text;
i integer;
conn text;
n integer;
num_done integer;
status integer;
dispatch_result integer;
dispatch_error text;
part text;
rand text;
BEGIN
-- loop through chunks
for i in
select generate_series(1,num_chunks)
LOOP
--for debugging
RAISE NOTICE 'Chunk %', i;
--make a new db connection
conn := 'conn_' || i;
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');';
execute sql;
--part := '(SELECT * FROM ' || table_to_chunk || ' WHERE osm_id % ' || num_chunks || ' = ' || i-1 || ') p1';
part := table_to_chunk || ' WHERE abs(osm_id % ' || num_chunks || ') = ' || i-1 || ' ';
--edit the input query using the subsquery string
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');';
execute sql into subquery;
RAISE NOTICE '%', subquery;
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');';
execute sql into dispatch_result;
end loop;
-- wait until all queries are finished
Loop
num_done := 0;
for i in 1..num_chunks
Loop
conn := 'conn_' || i;
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');';
execute sql into status;
if status = 0 THEN
-- check for error messages
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');';
execute sql into dispatch_error;
if dispatch_error <> 'OK' THEN
RAISE '%', dispatch_error;
end if;
num_done := num_done + 1;
END if;
end loop;
sql := 'select pg_sleep(1);';
execute sql;
if num_done >= num_chunks then
exit;
end if;
END loop;
-- disconnect the dblinks
FOR i in 1..num_chunks
LOOP
conn := 'conn_' || i;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
end loop;
RETURN 'Success';
-- error catching to disconnect dblink connections, if error occurs
exception when others then
BEGIN
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
for n in
SELECT generate_series(1,i) as n
LOOP
conn := 'conn_' || n;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
END LOOP;
exception when others then
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
end;
END
$BODY$
LANGUAGE plpgsql STABLE
COST 100;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment