Last active
January 27, 2022 21:17
-
-
Save mgagliardo91/e4899e1fb9f924db5710c5312223c885 to your computer and use it in GitHub Desktop.
Timescale Deletions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- The following function will work through the staging table by month and delete any data points from the staging table that | |
-- are duplicates of the data in the source table. This will ensure that the backfill script will not decompress chunks to upsert | |
-- the same exact data point. Ideally, it should reduce the amount of chunks that need to be decompressed. | |
CREATE OR REPLACE PROCEDURE public.clean_staging_duplicates( | |
tenant text | |
) AS $proc$ | |
DECLARE | |
earliest_time timestamptz; | |
latest_time timestamptz; | |
time_range timestamptz; | |
BEGIN | |
EXECUTE FORMAT($$SELECT time from %1$s.source_data_staging ORDER BY time ASC LIMIT 1$$, tenant) INTO earliest_time; | |
EXECUTE FORMAT($$SELECT time from %1$s.source_data_staging ORDER BY time DESC LIMIT 1$$, tenant) INTO latest_time; | |
IF earliest_time IS NULL THEN | |
RAISE NOTICE 'No data in staging for tenant %', tenant; | |
RETURN; | |
END IF; | |
RAISE NOTICE 'VACCUMING tenant staging %.source_data_staging', tenant; | |
EXECUTE FORMAT($$VACUUM (ANALYZE, VERBOSE) %1$s.source_data_staging;$$, tenant); | |
RAISE NOTICE 'early=% latest=%', earliest_time, latest_time; | |
FOR time_range IN SELECT generate_series(earliest_time, latest_time, '1 month') | |
LOOP | |
RAISE NOTICE 'Working on time range % -> %', time_range, time_range + '1 month'::INTERVAL; | |
RAISE NOTICE '%', FORMAT($$ | |
DELETE FROM %1$s.source_data_staging | |
USING %1$s.source_data_staging sds | |
JOIN %1$s.source_data | |
USING (source_id, label, time, data) | |
WHERE sds.time BETWEEN '%2$s' AND '%3$s'; | |
$$, tenant, time_range, time_range + '1 month'::INTERVAL); | |
END LOOP; | |
END; | |
$proc$ | |
LANGUAGE PLPGSQL; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- The following function attempts to delete source data across chunks using the minimal amount of decompression. | |
-- The sequence is as follows: | |
-- 1. Fetch all chunks for the tenant that contain the newer_than -> older_than range | |
-- 2. For each chunk: | |
-- a. Determine if the chunk is compressed | |
-- b. If not compressed, simply execute a DELETE FROM uncompressed_chunk WHERE source_id, label, time_range... | |
-- c. If compressed: | |
-- i. Check if the range is fully encompassed across the chunk (i.e. we can just delete all | |
-- occurrences of the source_id/label). | |
-- ii. If fully encompassed, simply execute a DELETE FROM compressed_chunk ... | |
-- iii. If not fully encompassed (there are data points that should not be deleted), decompress the chunk and then delete | |
CREATE OR REPLACE PROCEDURE public.delete_source_data( | |
tenant text, | |
source_id text, | |
label text, | |
newer_than timestamptz, | |
older_than timestamptz, | |
dry_run boolean default true | |
) AS $proc$ | |
DECLARE | |
full_chunk text; | |
schema text; -- schema of the chunk | |
chunk_table_name text; -- name of the chunk | |
chunk_information timescaledb_information.chunks; -- information about the chunk | |
chunk _timescaledb_catalog.chunk; -- the non-compressed chunk | |
data_chunk _timescaledb_catalog.chunk; -- the non-compressed/compressed chunk containing the actual data | |
earliest_point timestamptz; | |
latest_point timestamptz; | |
BEGIN | |
FOR full_chunk in | |
SELECT public.show_chunks(FORMAT('%s.source_data', tenant), older_than => older_than + '3 days'::INTERVAL, newer_than => newer_than - '3 days'::INTERVAL) | |
LOOP | |
SELECT split_part(full_chunk, '.', 1) INTO schema; | |
SELECT split_part(full_chunk, '.', 2) INTO chunk_table_name; | |
SELECT c.* INTO chunk_information FROM timescaledb_information.chunks c WHERE chunk_schema = schema AND chunk_name = chunk_table_name; | |
SELECT c.* INTO chunk FROM _timescaledb_catalog.chunk c WHERE c.schema_name = schema AND c.table_name = chunk_table_name; | |
IF chunk_information.range_end <= newer_than OR chunk_information.range_start >= older_than THEN | |
RAISE NOTICE 'Skipping chunk due to no data points in bound: %.% (% -> %)', chunk.schema_name, chunk.table_name, chunk_information.range_start, chunk_information.range_end; | |
CONTINUE; | |
END IF; | |
IF chunk.compressed_chunk_id IS NOT NULL THEN | |
-- Chunk is compressed, lets see what we have to do | |
SELECT c.* INTO data_chunk FROM _timescaledb_catalog.chunk c WHERE id = chunk.compressed_chunk_id; | |
RAISE NOTICE 'Working through compressed chunk: %.% (% -> %)', data_chunk.schema_name, data_chunk.table_name, chunk_information.range_start, chunk_information.range_end; | |
IF chunk_information.range_start > newer_than AND chunk_information.range_end < older_than THEN | |
-- Chunk is all encompassing, delete like normal | |
IF dry_run THEN | |
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%3$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than); | |
ELSE | |
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%3$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than); | |
END IF; | |
ELSE | |
-- Edge of request falls in the chunk | |
EXECUTE FORMAT($$ | |
SELECT | |
time | |
FROM %1$s.%2$s | |
WHERE source_id = '%3$s' AND label = '%4$s' | |
ORDER BY time ASC LIMIT 1 | |
$$, chunk.schema_name, chunk.table_name, source_id, label) INTO earliest_point; | |
EXECUTE FORMAT($$ | |
SELECT | |
time | |
FROM %1$s.%2$s | |
WHERE source_id = '%3$s' AND label = '%4$s' | |
ORDER BY time DESC LIMIT 1 | |
$$, chunk.schema_name, chunk.table_name, source_id, label) INTO latest_point; | |
IF (earliest_point IS NOT NULL OR latest_point IS NOT NULL) AND ((chunk_information.range_start < newer_than AND earliest_point < newer_than) OR (chunk_information.range_end > older_than AND latest_point > older_than)) THEN | |
RAISE NOTICE 'Chunk requires decompression in order to delete'; | |
IF dry_run THEN | |
RAISE NOTICE 'Dry-Run: %', FORMAT($$SELECT public.decompress_chunk('%1$s.%2$s');$$, chunk.schema_name, chunk.table_name); | |
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than, older_than); | |
ELSE | |
EXECUTE FORMAT($$SELECT public.decompress_chunk('%1$s.%2$s');$$, chunk.schema_name, chunk.table_name); | |
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than, older_than); | |
END IF; | |
ELSE | |
-- Chunk is all encompassing, delete like normal | |
IF dry_run THEN | |
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than); | |
ELSE | |
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than); | |
END IF; | |
END IF; | |
END IF; | |
ELSE | |
-- Chunk is not compressed, just delete like normal | |
RAISE NOTICE 'Updating series in uncompressed chunk: %.% (% -> %)', data_chunk.schema_name, data_chunk.table_name, chunk_information.range_start, chunk_information.range_end; | |
If dry_run THEN | |
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than::text, older_than::text); | |
ELSE | |
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than::text, older_than::text); | |
END IF; | |
COMMIT; | |
END IF; | |
END LOOP; | |
END; | |
$proc$ | |
LANGUAGE PLPGSQL; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The above has no side effects in its current state, it will require switching
RAISE NOTICE '%', FORMAT
commands toEXECUTE FORMAT
and uncommenting the-- COMMIT;
to make this work.An example of usage: