Skip to content

Instantly share code, notes, and snippets.

@Nhoutain
Last active September 10, 2021 13:31
Show Gist options
  • Save Nhoutain/3b9b3a88a9b6c402331f179f02a3b6c6 to your computer and use it in GitHub Desktop.
Save Nhoutain/3b9b3a88a9b6c402331f179f02a3b6c6 to your computer and use it in GitHub Desktop.
Fix flow optimisation range ('A, 1, 2', and 'A, 2, 3' --> 'A, 1, 3')
DROP FUNCTION IF EXISTS public.flow_business_key(table_name TEXT);
DROP FUNCTION IF EXISTS public.flow_business_data(table_name TEXT);
DROP FUNCTION IF EXISTS public.flows();
DROP FUNCTION IF EXISTS public.fix_flows_successive_data(fix boolean);
DROP FUNCTION IF EXISTS public.fix_flow_successive_data(table_access TEXT, fix boolean);
DROP FUNCTION IF EXISTS public.do_fix_flow_successive_data(table_access TEXT, bkey TEXT[], data TEXT[], fix boolean);
DROP FUNCTION IF EXISTS public.NEW_fix_flows_successive_data(fix boolean);
DROP FUNCTION IF EXISTS public.NEW_fix_flow_successive_data(table_access TEXT, size bigint, fix boolean);
DROP FUNCTION IF EXISTS public.NEW_do_fix_flow_successive_data(table_access TEXT, temp_table TEXT, bkey TEXT[], data TEXT[], index bigint, size bigint, fix boolean);
-- ----------------------------------------------------------------------
-- FLOWS
-- ----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.flow_business_key(table_name TEXT)
RETURNS TABLE(bkey text[]) AS
$body$
SELECT ('{'||(regexp_matches(indexdef, '\((.*), mt, mvf\)'))[1]||'}')::text[]
FROM pg_indexes indexes
WHERE indexes.tablename = $1 and indexname like '%_bkey'
$body$
LANGUAGE sql;
CREATE OR REPLACE FUNCTION public.flow_business_data(table_name TEXT)
RETURNS TABLE(data text[]) AS
$func$
DECLARE
data text[];
BEGIN
data := ARRAY(SELECT '"' || columns.column_name || '"'
FROM information_schema.columns columns
WHERE columns.table_name = $1
and columns.column_name <> 'mvf'
and columns.column_name <> 'mvt'
and columns.column_name <> 'mt'
and columns.column_name <> 'mr'
and columns.column_name <> 'mm'
and columns.column_name <> 'mid'
and columns.column_name <> 'mn'
);
RETURN QUERY EXECUTE(format('SELECT %s::text[]', quote_literal(data)));
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.flows()
RETURNS TABLE(table_access text, bkey text[], data text[]) AS
$body$
SELECT table_schema || '.' || table_name as table_access, flow_business_key(table_name) as bkey, flow_business_data(table_name) as data
FROM information_schema.tables
WHERE table_name in (select distinct('flow_' || REPLACE(flow_id, '-', '')) from injection where status = 'DONE')
$body$
LANGUAGE sql;
-- ----------------------------------------------------------------------
-- FIX SUCCESSIVE
-- ----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.fix_flows_successive_data(fix boolean DEFAULT false)
RETURNS TABLE(table_access text, mid text, new_mvf bigint) AS
$func$
DECLARE
rec record;
counter INTEGER := 0 ;
total INTEGER := 0 ;
BEGIN
total := (SELECT count(*)::INTEGER FROM flows());
RAISE NOTICE 'Fixing successive on all tables [%]', total;
FOR rec IN SELECT * FROM flows()
LOOP
counter := counter + 1 ;
RAISE NOTICE ' [%/%] ...', counter, total;
RETURN QUERY EXECUTE format(
'SELECT %s, * from public.do_fix_flow_successive_data(%s, %s, %s, %s)',
quote_literal(rec.table_access), quote_literal(rec.table_access), quote_literal(format('%s', rec.bkey)), quote_literal(format('%s', rec.data)), quote_literal(fix));
END LOOP;
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.fix_flow_successive_data(table_access TEXT, fix boolean DEFAULT false)
RETURNS TABLE(mid text, new_mvf bigint) AS
$func$
DECLARE
bkey text[];
data text[];
BEGIN
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', '')))
INTO bkey;
EXECUTE format('select data from public.flow_business_data(%s)', quote_literal(REPLACE(table_access, 'flows.', '')))
INTO data;
RETURN QUERY EXECUTE format(
'SELECT mid, new_mvf from public.do_fix_flow_successive_data(%s, %s, %s, %s)',
quote_literal(table_access), quote_literal(format('%s', bkey)), quote_literal(format('%s', data)), quote_literal(fix));
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.do_fix_flow_successive_data(table_access TEXT, bkey TEXT[], data TEXT[], fix boolean DEFAULT false)
RETURNS TABLE(mid text, new_mvf bigint) AS
$func$
DECLARE
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', ''));
cursor REFCURSOR;
previous_row RECORD;
current_row RECORD;
merge_mid text;
merge_mvt bigint;
bkey_string text := array_to_string(array_agg(bkey), ', ');
data_string text := array_to_string(array_agg(data), ', ');
counter bigint := 0 ;
BEGIN
RAISE NOTICE ' Fixing successive on table [%] (%)', table_access, bkey_string;
RAISE NOTICE ' Create temporary update for table [%]', table_access;
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, new_mvf bigint) ON COMMIT DROP', temp_table);
RAISE NOTICE ' Open cursor on table [%]', table_access;
OPEN cursor SCROLL FOR EXECUTE format(
'SELECT *, lead(mid, 1) over (partition by mt, %s order by mt, %s, mvf, mvt) as see_next'
' FROM %s '
' ORDER BY mt, %s, mvf, mvt', data_string, data_string, table_access, data_string);
LOOP
if counter = 0 THEN
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
RAISE NOTICE ' Starting process';
END IF;
previous_row := current_row;
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
counter := counter + 1;
IF counter % 50000 = 0 THEN
RAISE NOTICE ' Process row %', counter;
END IF;
IF previous_row.see_next is not null AND previous_row.mvt + 1 = current_row.mvf THEN
if merge_mid is null THEN
-- We start a series
merge_mid := previous_row.mid;
merge_mvt := previous_row.mvt;
end if;
EXECUTE format('INSERT INTO %s VALUES (%s::text, null)', temp_table, quote_literal(current_row.mid));
IF fix THEN
EXECUTE format('DELETE FROM %s where mid = %s', table_access, quote_literal(current_row.mid));
END IF;
merge_mvt := current_row.mvt;
ELSE
if merge_mid is not null THEN
-- We have a rupture, so we must update the last rupture
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s)', temp_table, quote_literal(merge_mid), merge_mvt);
IF fix THEN
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', table_access, merge_mvt, quote_literal(merge_mid));
END IF;
merge_mvt := null;
merge_mid := null;
END IF;
end if;
END LOOP;
CLOSE cursor;
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table));
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.NEW_fix_flow_successive_data(table_access TEXT, size bigint, fix boolean DEFAULT false)
RETURNS TABLE(mid text, new_mvf bigint) AS
$func$
DECLARE
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', ''));
bkey text[];
data text[];
max_size bigint;
count bigint := 0;
BEGIN
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', '')))
INTO bkey;
EXECUTE format('select data from public.flow_business_data(%s)', quote_literal(REPLACE(table_access, 'flows.', '')))
INTO data;
EXECUTE format('select count(*) from %s', table_access)
INTO max_size;
RAISE NOTICE ' Create temporary update for table [%]', table_access;
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, new_mvf bigint) ON COMMIT DROP', temp_table);
RAISE NOTICE 'Fixing table % with max size %', table_access, max_size;
WHILE max_size > count LOOP
RAISE NOTICE ' index [%/%] ...', count, max_size;
RETURN QUERY EXECUTE format(
'SELECT mid, new_mvf from public.NEW_do_fix_flow_successive_data(%s, %s, %s, %s, %s, %s, %s)',
quote_literal(table_access),
quote_literal(temp_table),
quote_literal(format('%s', bkey)),
quote_literal(format('%s', data)),
count,
size,
quote_literal(fix));
count := count + size;
END LOOP;
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.NEW_do_fix_flow_successive_data(table_access TEXT, temp_table text, bkey TEXT[], data TEXT[], index bigint, size bigint, fix boolean DEFAULT false)
RETURNS TABLE(mid text, new_mvf bigint) AS
$func$
DECLARE
cursor REFCURSOR;
counter bigint := 0 ;
previous_row record;
current_row record;
bkey_string text := array_to_string(array_agg(bkey), ', ');
merge_mid text;
merge_mvt bigint;
BEGIN
RAISE NOTICE ' Fixing successive on table [%] (%)', table_access, bkey_string;
RAISE NOTICE ' Open cursor on table [%]', table_access;
OPEN cursor SCROLL FOR EXECUTE format('SELECT * FROM %s ORDER BY %s, mt, mvf offset %s rows fetch first %s rows only',
table_access, bkey_string, index, size);
LOOP
if counter = 0 THEN
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
RAISE NOTICE ' Starting process';
END IF;
previous_row := current_row;
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
counter := counter + 1;
IF counter % 50000 = 0 THEN
RAISE NOTICE ' Process row %', counter;
END IF;
IF successiveRecord(previous_row, current_row, data) THEN
if merge_mid is null THEN
-- We start a series
merge_mid := previous_row.mid;
merge_mvt := previous_row.mvt;
end if;
EXECUTE format('INSERT INTO %s VALUES (%s::text, null)', temp_table, quote_literal(current_row.mid));
IF fix THEN
EXECUTE format('DELETE FROM %s where mid = %s', table_access, quote_literal(current_row.mid));
END IF;
merge_mvt := current_row.mvt;
ELSE
if merge_mid is not null THEN
-- We have a rupture, so we must update the last rupture
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s)', temp_table, quote_literal(merge_mid), merge_mvt);
IF fix THEN
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', table_access, merge_mvt, quote_literal(merge_mid));
END IF;
merge_mvt := null;
merge_mid := null;
END IF;
end if;
END LOOP;
CLOSE cursor;
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table));
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.successiveRecord(first record, second record, popos text[])
RETURNS bool AS
$func$
DECLARE
c text;
first_json json;
second_json json;
BEGIN
IF first.mt = second.mt AND first.mvt + 1 = second.mvf THEN
first_json := to_json(first);
second_json := to_json(second);
FOREACH c in ARRAY popos LOOP
if (first_json ->> c) <> (second_json ->> c) THEN
RETURN false;
end if;
end loop;
ELSE
return false;
end if;
RETURN true;
END
$func$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment