Timeseries Data in MySQL
- IOT Readings
- Performance Metrics
- Heartbeat System
We operate on the tinesearies data, chunk it down into buckets and run mins, avgs and maxes over all of that data
- table
CREATE TABLE timeseries (
`id` bigint UNSIGNED NOT NULL AUTO_INCREMENT,
`k` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT 'default',
`v` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT '',
`datapoint` int UNSIGNED NOT NULL DEFAULT 0,
`dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
)
- all rows
SELECT
k,
v,
datapoint,
UNIX_TIMESTAMP( dt ) AS ts,
FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
COUNT(*) over bucket AS count_over_bucket,
AVG( v ) over bucket AS avg_v_over_bucket,
MAX( v ) over bucket AS max_v_over_bucket,
MIN( v ) over bucket AS min_v_over_bucket
FROM
`timeseries`
WHERE
k = 'default' window bucket AS (
PARTITION BY FLOOR(
UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 ))
ORDER BY
dt
)
- filtering intermediate rows and keeping one row per bucket
WITH windowed_data AS (
SELECT
k,
v,
datapoint,
UNIX_TIMESTAMP( dt ) AS last_timestamp,
first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
COUNT(*) over bucket AS count_over_bucket,
AVG( v ) over bucket AS avg_v_over_bucket,
MAX( v ) over bucket AS max_v_over_bucket,
MIN( v ) over bucket AS min_v_over_bucket,
IF
( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition
FROM
`timeseries`
WHERE
k = 'default' window bucket AS ( PARTITION BY FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY dt )
) SELECT
*
FROM
windowed_data
WHERE
final_row_in_bucket = 1
- for all of the keys, last 6 hours data to speed up things
WITH windowed_data AS (
SELECT
'6hr_60pt' as `type`,
k,
v,
datapoint,
UNIX_TIMESTAMP( dt ) AS last_timestamp,
first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
COUNT(*) over bucket AS count_over_bucket,
AVG( v ) over bucket AS avg_v_over_bucket,
MAX( v ) over bucket AS max_v_over_bucket,
MIN( v ) over bucket AS min_v_over_bucket,
IF
( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition
FROM
`timeseries`
WHERE
dt >= NOW() - INTERVAL 6 HOUR -- data from last 6 hours
window bucket AS ( PARTITION BY k, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY k, dt )
) SELECT
*
FROM
windowed_data
WHERE
final_row_in_bucket = 1
next to speed up, create summary for current computation, at run time calculate last remaining bucket
insert into timeseries_rollup ( select * from windowed_date )
have compount index over type, key and bucket, on collition update count/avg/min/max, select directly from rollup table, we could drop INTERVAL from 6 hours to 15 minites and keep updating rollup table while calulating timeseries in backgorund
let booking_time_series = 'time_series_booking'
exports.up = async knex => {
await knex.schema.createTable(booking_time_series, table => {
table.increments("id").unsigned().primary();
table.string('k', 255).collate('ascii_bin').notNullable().defaultTo('default')
table.string('v', 255).collate('ascii_bin').notNullable().defaultTo('')
table.integer("datapoint").notNullable().defaultTo(1)
table.datetime('dt').notNullable().defaultTo(knex.fn.now())
})
await knex.schema.table(booking_time_series, table => {
table.index(['dt', 'k', 'v', 'datapoint'])
})
}
exports.down = async knex => {
await knex.schema.dropTable(booking_time_series)
}