Skip to content

Instantly share code, notes, and snippets.

@den-crane
Last active December 29, 2023 02:02
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save den-crane/49ce2ae3a688651b9c2dd85ee592cb15 to your computer and use it in GitHub Desktop.
Save den-crane/49ce2ae3a688651b9c2dd85ee592cb15 to your computer and use it in GitHub Desktop.
event sourcing using AggregatingMergeTree
DROP TABLE IF EXISTS states_raw;
DROP TABLE IF EXISTS final_states_by_day;
DROP TABLE IF EXISTS final_states_by_day_mv;
CREATE TABLE states_raw
(
process String,
state String,
stateint Int64,
statevalue Float64,
statetime DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY (process, statetime);
CREATE TABLE final_states_by_day
(
process String,
finalstatetime AggregateFunction(MAX, DateTime),
finalstate AggregateFunction(argMax, String, DateTime),
finalstateint AggregateFunction(argMax, Int64, DateTime),
finalstatevalue AggregateFunction(sum, Float64),
finalstatecount AggregateFunction(sum, UInt64),
statedate Date
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(statedate)
ORDER BY (process, statedate);
CREATE MATERIALIZED VIEW final_states_by_day_mv TO final_states_by_day AS
SELECT
process,
maxState(statetime) AS finalstatetime,
argMaxState(state, statetime) AS finalstate,
argMaxState(stateint, statetime) AS finalstateint,
sumState(statevalue) AS finalstatevalue,
sumState(toUInt64(1)) AS finalstatecount,
toDate(statetime) AS statedate
FROM states_raw
GROUP BY
process,
statedate;
insert into states_raw values('process1', 'first state', 0, 1.1, now()-100),
('process2', 'first state', 0, 3.3, now()-100),
('process1', 'last state', 19, 1.1, now()),
('process2', 'last state', 17, 3.3, now());
insert into states_raw values('process1', 'first state', 0, 1.1, now()-100-3600*24),
('process2', 'first state', 0, 3.3, now()-100-3600*24),
('process1', 'last state', 10, 1.1, now()-3600*24),
('process2', 'last state', 12, 3.3,now()-3600*24);
SELECT *
FROM states_raw;
┌─process──┬─state───────┬─stateint─┬─statevalue─┬───────────statetime─┐
│ process1 │ first state │ 0 │ 1.1 │ 2018-10-18 14:16:35 │
│ process1 │ last state │ 10 │ 1.1 │ 2018-10-18 14:18:15 │
│ process2 │ first state │ 0 │ 3.3 │ 2018-10-18 14:16:35 │
│ process2 │ last state │ 12 │ 3.3 │ 2018-10-18 14:18:15 │
│ process1 │ first state │ 0 │ 1.1 │ 2018-10-19 14:16:00 │
│ process1 │ last state │ 19 │ 1.1 │ 2018-10-19 14:17:40 │
│ process2 │ first state │ 0 │ 3.3 │ 2018-10-19 14:16:00 │
│ process2 │ last state │ 17 │ 3.3 │ 2018-10-19 14:17:40 │
└──────────┴─────────────┴──────────┴────────────┴─────────────────────┘
## final states by date
SELECT
statedate,
process,
maxMerge(finalstatetime),
argMaxMerge(finalstate),
argMaxMerge(finalstateint),
sumMerge(finalstatevalue),
sumMerge(finalstatecount)
FROM final_states_by_day
GROUP BY
process,
statedate
ORDER BY
statedate ASC,
process ASC;
┌──statedate─┬─process──┬─maxMerge(finalstatetime)─┬─argMaxMerge(finalstate)─┬─argMaxMerge(finalstateint)─┬─sumMerge(finalstatevalue)─┬─sumMerge(finalstatecount)─┐
│ 2018-10-18 │ process1 │ 2018-10-18 14:18:15 │ last state │ 10 │ 2.2 │ 2 │
│ 2018-10-18 │ process2 │ 2018-10-18 14:18:15 │ last state │ 12 │ 6.6 │ 2 │
│ 2018-10-19 │ process1 │ 2018-10-19 14:17:40 │ last state │ 19 │ 2.2 │ 2 │
│ 2018-10-19 │ process2 │ 2018-10-19 14:17:40 │ last state │ 17 │ 6.6 │ 2 │
└────────────┴──────────┴──────────────────────────┴─────────────────────────┴────────────────────────────┴───────────────────────────┴───────────────────────────┘
## final states overall
SELECT
max(statedate),
process,
maxMerge(finalstatetime),
argMaxMerge(finalstate),
argMaxMerge(finalstateint),
sumMerge(finalstatevalue),
sumMerge(finalstatecount)
FROM final_states_by_day
GROUP BY process
ORDER BY process ASC
┌─max(statedate)─┬─process──┬─maxMerge(finalstatetime)─┬─argMaxMerge(finalstate)─┬─argMaxMerge(finalstateint)─┬─sumMerge(finalstatevalue)─┬─sumMerge(finalstatecount)─┐
│ 2018-10-19 │ process1 │ 2018-10-19 14:17:40 │ last state │ 19 │ 4.4 │ 4 │
│ 2018-10-19 │ process2 │ 2018-10-19 14:17:40 │ last state │ 17 │ 13.2 │ 4 │
└────────────────┴──────────┴──────────────────────────┴─────────────────────────┴────────────────────────────┴───────────────────────────┴───────────────────────────┘
## if you don't need states_raw just use null engine
create table states_raw(process String, state String, stateint Int64, statevalue Float64, statetime DateTime default now()) Engine=Null;
## And eventually after merges final_states_by_day will have only one row per process per day.
optimize table final_states_by_day final
SELECT count()
FROM states_raw;
┌─count()─┐
│ 8 │
└─────────┘
SELECT count()
FROM final_states_by_day;
┌─count()─┐
│ 4 │
└─────────┘
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment