event sourcing using AggregatingMergeTree
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS states_raw; | |
DROP TABLE IF EXISTS final_states_by_day; | |
DROP TABLE IF EXISTS final_states_by_day_mv; | |
CREATE TABLE states_raw | |
( | |
process String, | |
state String, | |
stateint Int64, | |
statevalue Float64, | |
statetime DateTime DEFAULT now() | |
) | |
ENGINE = MergeTree | |
ORDER BY (process, statetime); | |
CREATE TABLE final_states_by_day | |
( | |
process String, | |
finalstatetime AggregateFunction(MAX, DateTime), | |
finalstate AggregateFunction(argMax, String, DateTime), | |
finalstateint AggregateFunction(argMax, Int64, DateTime), | |
finalstatevalue AggregateFunction(sum, Float64), | |
finalstatecount AggregateFunction(sum, UInt64), | |
statedate Date | |
) | |
ENGINE = AggregatingMergeTree | |
PARTITION BY toYYYYMM(statedate) | |
ORDER BY (process, statedate); | |
CREATE MATERIALIZED VIEW final_states_by_day_mv TO final_states_by_day AS | |
SELECT | |
process, | |
maxState(statetime) AS finalstatetime, | |
argMaxState(state, statetime) AS finalstate, | |
argMaxState(stateint, statetime) AS finalstateint, | |
sumState(statevalue) AS finalstatevalue, | |
sumState(toUInt64(1)) AS finalstatecount, | |
toDate(statetime) AS statedate | |
FROM states_raw | |
GROUP BY | |
process, | |
statedate; | |
insert into states_raw values('process1', 'first state', 0, 1.1, now()-100), | |
('process2', 'first state', 0, 3.3, now()-100), | |
('process1', 'last state', 19, 1.1, now()), | |
('process2', 'last state', 17, 3.3, now()); | |
insert into states_raw values('process1', 'first state', 0, 1.1, now()-100-3600*24), | |
('process2', 'first state', 0, 3.3, now()-100-3600*24), | |
('process1', 'last state', 10, 1.1, now()-3600*24), | |
('process2', 'last state', 12, 3.3,now()-3600*24); | |
SELECT * | |
FROM states_raw; | |
┌─process──┬─state───────┬─stateint─┬─statevalue─┬───────────statetime─┐ | |
│ process1 │ first state │ 0 │ 1.1 │ 2018-10-18 14:16:35 │ | |
│ process1 │ last state │ 10 │ 1.1 │ 2018-10-18 14:18:15 │ | |
│ process2 │ first state │ 0 │ 3.3 │ 2018-10-18 14:16:35 │ | |
│ process2 │ last state │ 12 │ 3.3 │ 2018-10-18 14:18:15 │ | |
│ process1 │ first state │ 0 │ 1.1 │ 2018-10-19 14:16:00 │ | |
│ process1 │ last state │ 19 │ 1.1 │ 2018-10-19 14:17:40 │ | |
│ process2 │ first state │ 0 │ 3.3 │ 2018-10-19 14:16:00 │ | |
│ process2 │ last state │ 17 │ 3.3 │ 2018-10-19 14:17:40 │ | |
└──────────┴─────────────┴──────────┴────────────┴─────────────────────┘ | |
## final states by date | |
SELECT | |
statedate, | |
process, | |
maxMerge(finalstatetime), | |
argMaxMerge(finalstate), | |
argMaxMerge(finalstateint), | |
sumMerge(finalstatevalue), | |
sumMerge(finalstatecount) | |
FROM final_states_by_day | |
GROUP BY | |
process, | |
statedate | |
ORDER BY | |
statedate ASC, | |
process ASC; | |
┌──statedate─┬─process──┬─maxMerge(finalstatetime)─┬─argMaxMerge(finalstate)─┬─argMaxMerge(finalstateint)─┬─sumMerge(finalstatevalue)─┬─sumMerge(finalstatecount)─┐ | |
│ 2018-10-18 │ process1 │ 2018-10-18 14:18:15 │ last state │ 10 │ 2.2 │ 2 │ | |
│ 2018-10-18 │ process2 │ 2018-10-18 14:18:15 │ last state │ 12 │ 6.6 │ 2 │ | |
│ 2018-10-19 │ process1 │ 2018-10-19 14:17:40 │ last state │ 19 │ 2.2 │ 2 │ | |
│ 2018-10-19 │ process2 │ 2018-10-19 14:17:40 │ last state │ 17 │ 6.6 │ 2 │ | |
└────────────┴──────────┴──────────────────────────┴─────────────────────────┴────────────────────────────┴───────────────────────────┴───────────────────────────┘ | |
## final states overall | |
SELECT | |
max(statedate), | |
process, | |
maxMerge(finalstatetime), | |
argMaxMerge(finalstate), | |
argMaxMerge(finalstateint), | |
sumMerge(finalstatevalue), | |
sumMerge(finalstatecount) | |
FROM final_states_by_day | |
GROUP BY process | |
ORDER BY process ASC | |
┌─max(statedate)─┬─process──┬─maxMerge(finalstatetime)─┬─argMaxMerge(finalstate)─┬─argMaxMerge(finalstateint)─┬─sumMerge(finalstatevalue)─┬─sumMerge(finalstatecount)─┐ | |
│ 2018-10-19 │ process1 │ 2018-10-19 14:17:40 │ last state │ 19 │ 4.4 │ 4 │ | |
│ 2018-10-19 │ process2 │ 2018-10-19 14:17:40 │ last state │ 17 │ 13.2 │ 4 │ | |
└────────────────┴──────────┴──────────────────────────┴─────────────────────────┴────────────────────────────┴───────────────────────────┴───────────────────────────┘ | |
## if you don't need states_raw just use null engine | |
create table states_raw(process String, state String, stateint Int64, statevalue Float64, statetime DateTime default now()) Engine=Null; | |
## And eventually after merges final_states_by_day will have only one row per process per day. | |
optimize table final_states_by_day final | |
SELECT count() | |
FROM states_raw; | |
┌─count()─┐ | |
│ 8 │ | |
└─────────┘ | |
SELECT count() | |
FROM final_states_by_day; | |
┌─count()─┐ | |
│ 4 │ | |
└─────────┘ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment