Skip to content

Instantly share code, notes, and snippets.

@mgagliardo91
Last active January 27, 2022 21:17
Show Gist options
  • Save mgagliardo91/e4899e1fb9f924db5710c5312223c885 to your computer and use it in GitHub Desktop.
Save mgagliardo91/e4899e1fb9f924db5710c5312223c885 to your computer and use it in GitHub Desktop.
Timescale Deletions
-- The following function will work through the staging table by month and delete any data points from the staging table that
-- are duplicates of the data in the source table. This will ensure that the backfill script will not decompress chunks to upsert
-- the same exact data point. Ideally, it should reduce the amount of chunks that need to be decompressed.
CREATE OR REPLACE PROCEDURE public.clean_staging_duplicates(
tenant text
) AS $proc$
DECLARE
earliest_time timestamptz;
latest_time timestamptz;
time_range timestamptz;
BEGIN
EXECUTE FORMAT($$SELECT time from %1$s.source_data_staging ORDER BY time ASC LIMIT 1$$, tenant) INTO earliest_time;
EXECUTE FORMAT($$SELECT time from %1$s.source_data_staging ORDER BY time DESC LIMIT 1$$, tenant) INTO latest_time;
IF earliest_time IS NULL THEN
RAISE NOTICE 'No data in staging for tenant %', tenant;
RETURN;
END IF;
RAISE NOTICE 'VACCUMING tenant staging %.source_data_staging', tenant;
EXECUTE FORMAT($$VACUUM (ANALYZE, VERBOSE) %1$s.source_data_staging;$$, tenant);
RAISE NOTICE 'early=% latest=%', earliest_time, latest_time;
FOR time_range IN SELECT generate_series(earliest_time, latest_time, '1 month')
LOOP
RAISE NOTICE 'Working on time range % -> %', time_range, time_range + '1 month'::INTERVAL;
RAISE NOTICE '%', FORMAT($$
DELETE FROM %1$s.source_data_staging
USING %1$s.source_data_staging sds
JOIN %1$s.source_data
USING (source_id, label, time, data)
WHERE sds.time BETWEEN '%2$s' AND '%3$s';
$$, tenant, time_range, time_range + '1 month'::INTERVAL);
END LOOP;
END;
$proc$
LANGUAGE PLPGSQL;
-- The following function attempts to delete source data across chunks using the minimal amount of decompression.
-- The sequence is as follows:
-- 1. Fetch all chunks for the tenant that contain the newer_than -> older_than range
-- 2. For each chunk:
-- a. Determine if the chunk is compressed
-- b. If not compressed, simply execute a DELETE FROM uncompressed_chunk WHERE source_id, label, time_range...
-- c. If compressed:
-- i. Check if the range is fully encompassed across the chunk (i.e. we can just delete all
-- occurrences of the source_id/label).
-- ii. If fully encompassed, simply execute a DELETE FROM compressed_chunk ...
-- iii. If not fully encompassed (there are data points that should not be deleted), decompress the chunk and then delete
CREATE OR REPLACE PROCEDURE public.delete_source_data(
tenant text,
source_id text,
label text,
newer_than timestamptz,
older_than timestamptz,
dry_run boolean default true
) AS $proc$
DECLARE
full_chunk text;
schema text; -- schema of the chunk
chunk_table_name text; -- name of the chunk
chunk_information timescaledb_information.chunks; -- information about the chunk
chunk _timescaledb_catalog.chunk; -- the non-compressed chunk
data_chunk _timescaledb_catalog.chunk; -- the non-compressed/compressed chunk containing the actual data
earliest_point timestamptz;
latest_point timestamptz;
BEGIN
FOR full_chunk in
SELECT public.show_chunks(FORMAT('%s.source_data', tenant), older_than => older_than + '3 days'::INTERVAL, newer_than => newer_than - '3 days'::INTERVAL)
LOOP
SELECT split_part(full_chunk, '.', 1) INTO schema;
SELECT split_part(full_chunk, '.', 2) INTO chunk_table_name;
SELECT c.* INTO chunk_information FROM timescaledb_information.chunks c WHERE chunk_schema = schema AND chunk_name = chunk_table_name;
SELECT c.* INTO chunk FROM _timescaledb_catalog.chunk c WHERE c.schema_name = schema AND c.table_name = chunk_table_name;
IF chunk_information.range_end <= newer_than OR chunk_information.range_start >= older_than THEN
RAISE NOTICE 'Skipping chunk due to no data points in bound: %.% (% -> %)', chunk.schema_name, chunk.table_name, chunk_information.range_start, chunk_information.range_end;
CONTINUE;
END IF;
IF chunk.compressed_chunk_id IS NOT NULL THEN
-- Chunk is compressed, lets see what we have to do
SELECT c.* INTO data_chunk FROM _timescaledb_catalog.chunk c WHERE id = chunk.compressed_chunk_id;
RAISE NOTICE 'Working through compressed chunk: %.% (% -> %)', data_chunk.schema_name, data_chunk.table_name, chunk_information.range_start, chunk_information.range_end;
IF chunk_information.range_start > newer_than AND chunk_information.range_end < older_than THEN
-- Chunk is all encompassing, delete like normal
IF dry_run THEN
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%3$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than);
ELSE
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%3$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than);
END IF;
ELSE
-- Edge of request falls in the chunk
EXECUTE FORMAT($$
SELECT
time
FROM %1$s.%2$s
WHERE source_id = '%3$s' AND label = '%4$s'
ORDER BY time ASC LIMIT 1
$$, chunk.schema_name, chunk.table_name, source_id, label) INTO earliest_point;
EXECUTE FORMAT($$
SELECT
time
FROM %1$s.%2$s
WHERE source_id = '%3$s' AND label = '%4$s'
ORDER BY time DESC LIMIT 1
$$, chunk.schema_name, chunk.table_name, source_id, label) INTO latest_point;
IF (earliest_point IS NOT NULL OR latest_point IS NOT NULL) AND ((chunk_information.range_start < newer_than AND earliest_point < newer_than) OR (chunk_information.range_end > older_than AND latest_point > older_than)) THEN
RAISE NOTICE 'Chunk requires decompression in order to delete';
IF dry_run THEN
RAISE NOTICE 'Dry-Run: %', FORMAT($$SELECT public.decompress_chunk('%1$s.%2$s');$$, chunk.schema_name, chunk.table_name);
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than, older_than);
ELSE
EXECUTE FORMAT($$SELECT public.decompress_chunk('%1$s.%2$s');$$, chunk.schema_name, chunk.table_name);
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than, older_than);
END IF;
ELSE
-- Chunk is all encompassing, delete like normal
IF dry_run THEN
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than);
ELSE
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s';$$, data_chunk.schema_name, data_chunk.table_name, source_id, label, newer_than, older_than);
END IF;
END IF;
END IF;
ELSE
-- Chunk is not compressed, just delete like normal
RAISE NOTICE 'Updating series in uncompressed chunk: %.% (% -> %)', data_chunk.schema_name, data_chunk.table_name, chunk_information.range_start, chunk_information.range_end;
If dry_run THEN
RAISE NOTICE 'Dry-Run: %', FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than::text, older_than::text);
ELSE
EXECUTE FORMAT($$DELETE FROM %1$s.%2$s WHERE source_id = '%3$s' AND label = '%4$s' AND time >= %5$L AND time < %5$L;$$, chunk.schema_name, chunk.table_name, source_id, label, newer_than::text, older_than::text);
END IF;
COMMIT;
END IF;
END LOOP;
END;
$proc$
LANGUAGE PLPGSQL;
@mgagliardo91
Copy link
Author

The above has no side effects in its current state, it will require switching RAISE NOTICE '%', FORMAT commands to EXECUTE FORMAT and uncommenting the -- COMMIT; to make this work.

An example of usage:

CALL public.delete_source_data('ndustrial', 'am17070847', 'DMD_S_kVA', '2021-10-30 00:00:00+00'::timestamptz, '2021-11-16 00:00:00+00'::timestamptz);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment