Date: 2022-11-08
Test the failure modes of using the Kafka table error handling approach suggested by this Altinity doc and this ClickHouse PR .
CREATE TABLE destination
(
str String,
i Int64,
timestamp DateTime
)
ENGINE = MergeTree()
ORDER BY timestamp
CREATE TABLE errors
(
timestamp DateTime,
topic VARCHAR,
partition Int64,
offset Int64,
raw VARCHAR,
error VARCHAR
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (topic, toStartOfDay(timestamp), partition, offset)
CREATE TABLE kafka_destination
(
str String,
i Int64,
timestamp DateTime
)
ENGINE = Kafka(
'kafka:9092',
'yakko-test-topic',
'yakko-test-group-2022-11-08',
'JSONEachRow'
)
SETTINGS kafka_handle_error_mode = 'stream'
CREATE MATERIALIZED VIEW destination_mv TO destination
(
str String,
i Int64,
timestamp DateTime
) AS
SELECT
str,
i,
timestamp
FROM kafka_destination
CREATE MATERIALIZED VIEW errors_mv
TO errors
AS SELECT
_timestamp AS timestamp,
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM kafka_destination
WHERE length(_error) > 0
- Create all the tables from the Schemas section
- Create the topic
# create
kafka-topics --create --topic yakko-test-topic --bootstrap-server localhost:9092
# verify
kafka-topics --list --bootstrap-server localhost:9092
- Producing a message
kafka-console-producer --topic yakko-test-topic --bootstrap-server localhost:9092
Example messages:
Valid
{ "str": "foo", "i": 15, "timestamp": "2022-11-01 00:00:00" }
Invalid DateTime
{ "str": "foo", "i": 15, "timestamp": "2022-11-01" }
Invalid string
{ "str": 99, "i": 15, "timestamp": "2022-11-01 00:00:00" }
Invalid integer
{ "str": "foo", "i": "bar", "timestamp": "2022-11-01 00:00:00" }
- Truncate tables after each test
TRUNCATE TABLE destination
TRUNCATE TABLE errors
Do: Produce all 4 example messages to topic
Result
- 3 rows in
errors
- 4 rows in
destination
, 3 with empty values
Verdict
✅ OK
- Produce one valid message and one error
- Detach main MV
- Produce one valid message and one error
Result
errors
gets populateddestination
doesn't get populated
Verdict
✅ OK
- Steps from 2a
- Reattach the main MV
Result
destination
doesn't backfill- Producing new valid messages goes through
Verdict
Essentially this should behave just like the case above, but worth double checking.
- Produce one valid message and one error
- Detach main MV
- Produce one valid message and one error
Result
destination
gets populatederrors
doesn't get populated
Verdict
✅ OK
- Steps from 3a
- Reattach the errors MV
Result
errors
doesn't backfill- Producing new messages goes through
Verdict
I suspect this is not a thing but let's test it out.
- Use the schemas above, except add the setting
kafka_max_block_size=1
to the Kafka table - Create 10 destination tables + MVs following the schemas below
CREATE TABLE race1
(
str String,
offset Int64
)
ENGINE = MergeTree()
ORDER BY offset
CREATE MATERIALIZED VIEW race1_mv TO race1
(
str String,
offset Int64
) AS
SELECT
str,
_offset AS offset
FROM kafka_destination
- Produce a bunch of events to the topic
- Drop the Kafka table
- Check the offsets on each table
Result
The tables were always in sync, despite producing tens of thousands of events multiple times, telling the ClickHouse Kafka consumer to consume one message at a time, dropping the table at random times, etc.
This is not very thorough testing but seems to confirm the view of the world developed through reading source code and documentation.
Verdict
✅ OK - tables likely don't go out of sync by themselves
Let's do the same as above but introduce a broken MV to see how things behave.
CREATE TABLE race_broken
(
str String,
offset Int64
)
ENGINE = MergeTree()
ORDER BY offset
CREATE MATERIALIZED VIEW race_broken_mv TO race1
(
str String,
offset Int64
) AS
SELECT
str,
nonexistentcolumn
FROM kafka_destination
Result
Same as 4a. The broken MV gets ignored, ClickHouse doesn't even log it.
Verdict
✅ OK