Skip to content

Instantly share code, notes, and snippets.

@geon
Created June 1, 2017 12:37
Show Gist options
  • Save geon/38e822719c1e567ea6a9d33401f273ef to your computer and use it in GitHub Desktop.
Save geon/38e822719c1e567ea6a9d33401f273ef to your computer and use it in GitHub Desktop.
pipelinedb event clustering
-- DROP SCHEMA public CASCADE;
-- CREATE SCHEMA public;
-- The bad measurements that should be clustered into errors.
CREATE STREAM bad_measurements ("measurement" REAL, "timestamp" TIMESTAMPTZ);
-- This is only useful because it create an output stream of deltas, important for clustering events.
CREATE CONTINUOUS VIEW last_bad_measurements AS
SELECT
keyed_max("timestamp", "measurement") AS "measurement",
MAX("timestamp") AS "timestamp"
FROM bad_measurements
;
-- Emit a single error per cluster of bad measurements.
CREATE SEQUENCE tagged_bad_measurements_error_id_seq;
CREATE CONTINUOUS TRANSFORM tagged_bad_measurements AS
SELECT
-- Get a new error id when the time between measurements is over 1 h.
CASE
WHEN (old) IS NULL OR (new)."timestamp" - (old)."timestamp" > INTERVAL '2 second'
THEN nextval('tagged_bad_measurements_error_id_seq')
ELSE currval('tagged_bad_measurements_error_id_seq')
END AS "errorId",
(new)."measurement",
(new)."timestamp"
FROM output_of('last_bad_measurements')
;
CREATE CONTINUOUS VIEW errors AS
SELECT
"errorId",
MAX("measurement") AS "maxMeasurement",
COUNT(*) AS "numMeasurements",
MIN("timestamp") AS "beginTimestamp",
MAX("timestamp") AS "endTimestamp",
MAX("timestamp") - MIN("timestamp") AS "duration"
FROM output_of('tagged_bad_measurements')
GROUP BY "errorId"
;
-- Broken! Can't I insert in batches?
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES
(1, '2017-06-01 13:48:01'),
(1, '2017-06-01 13:48:02'),
(1, '2017-06-01 13:48:03'),
(1, '2017-06-01 13:48:04'),
(1, '2017-06-01 13:48:05'),
(1, '2017-06-01 13:49:01'),
(1, '2017-06-01 13:49:02'),
(1, '2017-06-01 13:49:03'),
(1, '2017-06-01 13:49:04'),
(1, '2017-06-01 13:49:05');
-- Separate inserts in the script won't work either.
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:48:01');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:48:02');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:48:03');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:48:04');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:48:05');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:49:01');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:49:02');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:49:03');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:49:04');
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 14:49:05');
-- It works perfectly if I insert one-by-one, slowly.
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:48:01');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:48:02');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:48:03');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:48:04');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:48:05');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:49:01');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:49:02');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:49:03');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:49:04');
SELECT pg_sleep(1);
INSERT INTO bad_measurements ("measurement", "timestamp") VALUES (1, '2017-06-01 15:49:05');
SELECT pg_sleep(1);
SELECT * FROM errors;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment