Skip to content

Instantly share code, notes, and snippets.

@Nhoutain
Last active September 2, 2021 08:09
Show Gist options
  • Save Nhoutain/99c50bdd9a2b31155a4c62bd6de559c4 to your computer and use it in GitHub Desktop.
Save Nhoutain/99c50bdd9a2b31155a4c62bd6de559c4 to your computer and use it in GitHub Desktop.
Duplicate data on mvt
DROP FUNCTION IF EXISTS public.flow_business_key(table_name TEXT);
DROP FUNCTION IF EXISTS public.flows();
DROP FUNCTION IF EXISTS public.fix_flows_duplicate_data(fix boolean);
DROP FUNCTION IF EXISTS public.fix_flow_duplicate_data(table_access TEXT, fix boolean);
DROP FUNCTION IF EXISTS public.fix_flow_bkey_duplicate_data(table_access TEXT, bkey TEXT[], fix boolean);
-- ----------------------------------------------------------------------
-- FLOWS
-- ----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.flow_business_key(table_name TEXT)
RETURNS TABLE(bkey text[]) AS
$body$
SELECT ('{'||(regexp_matches(indexdef, '\((.*), mt, mvf\)'))[1]||'}')::text[]
FROM pg_indexes indexes
WHERE indexes.tablename = $1 and indexname like '%_bkey'
$body$
LANGUAGE sql;
CREATE OR REPLACE FUNCTION public.flows()
RETURNS TABLE(table_access text, bkey text[]) AS
$body$
SELECT table_schema || '.' || table_name as table_access, flow_business_key(table_name) as bkey
FROM information_schema.tables
WHERE table_name in (select distinct('flow_' || REPLACE(flow_id, '-', '')) from injection where status = 'DONE')
$body$
LANGUAGE sql;
-- ----------------------------------------------------------------------
-- FIX DUPLICATE
-- ----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION public.fix_flows_duplicate_data(fix boolean DEFAULT false)
RETURNS TABLE(table_access text, mid text, old_mvt bigint, new_mvt bigint) AS
$func$
DECLARE
rec record;
counter INTEGER := 0 ;
total INTEGER := 0 ;
BEGIN
total := (SELECT count(*)::INTEGER FROM flows());
RAISE NOTICE 'Fixing duplicate on all tables [%]', total;
FOR rec IN SELECT * FROM flows()
LOOP
counter := counter + 1 ;
RAISE NOTICE ' [%/%] ...', counter, total;
RETURN QUERY EXECUTE format(
'SELECT %s, * from public.fix_flow_bkey_duplicate_data(%s, %s, %s)',
quote_literal(rec.table_access), quote_literal(rec.table_access), quote_literal(format('%s', rec.bkey)), quote_literal(fix));
END LOOP;
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.fix_flow_duplicate_data(table_access TEXT, fix boolean DEFAULT false)
RETURNS TABLE(mid text, old_mvt bigint, new_mvt bigint) AS
$func$
DECLARE
bkey text[];
BEGIN
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', '')))
INTO bkey;
RETURN QUERY EXECUTE format(
'SELECT mid, old_mvt, new_mvt from public.fix_flow_bkey_duplicate_data(%s, %s, %s)',
quote_literal(table_access), quote_literal(format('%s', bkey)), quote_literal(fix));
END
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.fix_flow_bkey_duplicate_data(table_access TEXT, bkey TEXT[], fix boolean DEFAULT false)
RETURNS TABLE(mid text, old_mvt bigint, new_mvt bigint) AS
$func$
DECLARE
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', ''));
cursor REFCURSOR;
previous_row RECORD;
current_row RECORD;
previous_mvt bigint;
current_mvt bigint;
bkey_string text := array_to_string(array_agg(bkey), ', ');
counter bigint := 0 ;
BEGIN
RAISE NOTICE ' Fixing duplicate on table [%] (%)', table_access, bkey_string;
RAISE NOTICE ' Create temporary update for table [%]', table_access;
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, old_mvt bigint, new_mvt bigint) ON COMMIT DROP', temp_table);
RAISE NOTICE ' Open cursor on table [%]', table_access;
OPEN cursor SCROLL FOR EXECUTE format(
'SELECT *, lead(mid, 1) over (partition by %s, mt order by %s, mt, mvf, mvt) as see_next'
' FROM %s '
' ORDER BY %s, mt, mvf, mvt', bkey_string, bkey_string, table_access, bkey_string);
LOOP
if counter = 0 THEN
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
current_mvt := current_row.mvt;
RAISE NOTICE ' Starting process';
END IF;
previous_row := current_row;
previous_mvt := current_mvt;
FETCH NEXT FROM cursor INTO current_row;
EXIT WHEN NOT FOUND;
current_mvt := current_row.mvt;
counter := counter + 1;
IF counter % 50000 = 0 THEN
RAISE NOTICE ' Process row %', counter;
END IF;
IF previous_row.see_next is not null THEN
IF previous_mvt > current_row.mvt THEN
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s, %s)',
temp_table, quote_literal(current_row.mid), current_row.mvt, previous_mvt);
IF fix THEN
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s',
table_access, previous_mvt, quote_literal(current_row.mid));
END IF;
current_mvt := previous_mvt;
END IF;
IF previous_mvt >= current_row.mvf THEN
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s, %s)',
temp_table, quote_literal(previous_row.mid), previous_mvt, current_row.mvf - 1);
IF fix THEN
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s',
table_access, current_row.mvf -1, quote_literal(previous_row.mid));
END IF;
previous_mvt := current_row.mvf - 1;
END IF;
END IF;
END LOOP;
CLOSE cursor;
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table));
END
$func$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment