Skip to content

Instantly share code, notes, and snippets.

@neharkarvishal
Created February 29, 2024 06:54
Show Gist options
  • Save neharkarvishal/58983959b20c3e12189ce8ea159407cd to your computer and use it in GitHub Desktop.
Save neharkarvishal/58983959b20c3e12189ce8ea159407cd to your computer and use it in GitHub Desktop.

Timeseries Data in MySQL

  • IOT Readings
  • Performance Metrics
  • Heartbeat System

We operate on the tinesearies data, chunk it down into buckets and run mins, avgs and maxes over all of that data

  • table
CREATE TABLE timeseries  (
                             `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT,
                             `k` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT 'default',
                             `v` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT '',
                             `datapoint` int UNSIGNED NOT NULL DEFAULT 0,
                             `dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
                             PRIMARY KEY (`id`)
)
  • all rows
SELECT
    k,
    v,
    datapoint,
    UNIX_TIMESTAMP( dt ) AS ts,
    FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
    COUNT(*) over bucket AS count_over_bucket,
    AVG( v ) over bucket AS avg_v_over_bucket,
    MAX( v ) over bucket AS max_v_over_bucket,
    MIN( v ) over bucket AS min_v_over_bucket
FROM
    `timeseries`
WHERE
    k = 'default' window bucket AS (
        PARTITION BY FLOOR(
                UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 ))
        ORDER BY
            dt
        )
  • filtering intermediate rows and keeping one row per bucket
WITH windowed_data AS (
    SELECT
        k,
        v,
        datapoint,
        UNIX_TIMESTAMP( dt ) AS last_timestamp,
        first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
        FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
        COUNT(*) over bucket AS count_over_bucket,
        AVG( v ) over bucket AS avg_v_over_bucket,
        MAX( v ) over bucket AS max_v_over_bucket,
        MIN( v ) over bucket AS min_v_over_bucket,
        IF
        ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition

    FROM
        `timeseries`
    WHERE
        k = 'default' window bucket AS ( PARTITION BY FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY dt )
) SELECT
    *
FROM
    windowed_data
WHERE
    final_row_in_bucket = 1
  • for all of the keys, last 6 hours data to speed up things
WITH windowed_data AS (
    SELECT
        '6hr_60pt' as `type`,
        k,
        v,
        datapoint,
        UNIX_TIMESTAMP( dt ) AS last_timestamp,
        first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
        FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
        COUNT(*) over bucket AS count_over_bucket,
        AVG( v ) over bucket AS avg_v_over_bucket,
        MAX( v ) over bucket AS max_v_over_bucket,
        MIN( v ) over bucket AS min_v_over_bucket,
        IF
        ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition

    FROM
        `timeseries`
    WHERE
        dt >= NOW() - INTERVAL 6 HOUR -- data from last 6 hours
    window bucket AS ( PARTITION BY k, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY k, dt )
) SELECT
    *
FROM
    windowed_data
WHERE
    final_row_in_bucket = 1

next to speed up, create summary for current computation, at run time calculate last remaining bucket

insert into timeseries_rollup ( select * from windowed_date )

have compount index over type, key and bucket, on collition update count/avg/min/max, select directly from rollup table, we could drop INTERVAL from 6 hours to 15 minites and keep updating rollup table while calulating timeseries in backgorund

let booking_time_series = 'time_series_booking'

exports.up = async knex => {
  await knex.schema.createTable(booking_time_series, table => {
    table.increments("id").unsigned().primary();

    table.string('k', 255).collate('ascii_bin').notNullable().defaultTo('default')

    table.string('v', 255).collate('ascii_bin').notNullable().defaultTo('')

    table.integer("datapoint").notNullable().defaultTo(1)

    table.datetime('dt').notNullable().defaultTo(knex.fn.now())
  })

  await knex.schema.table(booking_time_series, table => {
    table.index(['dt', 'k', 'v', 'datapoint'])
  })
}

exports.down = async knex => {
  await knex.schema.dropTable(booking_time_series)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment