Skip to content

Instantly share code, notes, and snippets.

@yakkomajuri
Created November 8, 2022 14:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yakkomajuri/96832c4f1caeacefec7cbfba6ce23186 to your computer and use it in GitHub Desktop.
Save yakkomajuri/96832c4f1caeacefec7cbfba6ce23186 to your computer and use it in GitHub Desktop.

Error handling for Kafka engine tables on ClickHouse

Date: 2022-11-08

Objective

Test the failure modes of using the Kafka table error handling approach suggested by this Altinity doc and this ClickHouse PR .

Schemas

Main table

CREATE TABLE destination 
(
	str String,
	i Int64,
	timestamp DateTime
)
ENGINE = MergeTree()
ORDER BY timestamp

Errors table

CREATE TABLE errors
(
    timestamp DateTime,
    topic VARCHAR,
    partition Int64,
    offset Int64,
    raw VARCHAR,
    error VARCHAR
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (topic, toStartOfDay(timestamp), partition, offset)

Kafka table

CREATE TABLE kafka_destination
(
	str String,
	i Int64,
	timestamp DateTime
)
ENGINE = Kafka(
	'kafka:9092', 
	'yakko-test-topic', 
	'yakko-test-group-2022-11-08', 
	'JSONEachRow'
)
SETTINGS kafka_handle_error_mode = 'stream'

Main MV

CREATE MATERIALIZED VIEW destination_mv TO destination
(
	str String,
	i Int64,
	timestamp DateTime
) AS
SELECT
    str,
    i,
    timestamp
FROM kafka_destination

Errors MV

CREATE MATERIALIZED VIEW errors_mv
TO errors
AS SELECT
    _timestamp AS timestamp,
    _topic AS topic,
    _partition AS partition,
    _offset AS offset,
    _raw_message AS raw,
    _error AS error
FROM kafka_destination
WHERE length(_error) > 0

Setup

  1. Create all the tables from the Schemas section
  2. Create the topic
# create
kafka-topics --create --topic yakko-test-topic --bootstrap-server localhost:9092

# verify
kafka-topics --list --bootstrap-server localhost:9092
  1. Producing a message
kafka-console-producer --topic yakko-test-topic --bootstrap-server localhost:9092

Example messages:

Valid

{ "str": "foo", "i": 15, "timestamp": "2022-11-01 00:00:00" }

Invalid DateTime

{ "str": "foo", "i": 15, "timestamp": "2022-11-01" }

Invalid string

{ "str": 99, "i": 15, "timestamp": "2022-11-01 00:00:00" }

Invalid integer

{ "str": "foo", "i": "bar", "timestamp": "2022-11-01 00:00:00" }
  1. Truncate tables after each test
TRUNCATE TABLE destination
TRUNCATE TABLE errors

Testing

1: Normal conditions

Do: Produce all 4 example messages to topic

Result

  • 3 rows in errors
  • 4 rows in destination, 3 with empty values

Verdict

✅ OK

2: Detaching the main MV

2a: Detach

  1. Produce one valid message and one error
  2. Detach main MV
  3. Produce one valid message and one error

Result

  • errors gets populated
  • destination doesn't get populated

Verdict

✅ OK

2b: Reattach

  1. Steps from 2a
  2. Reattach the main MV

Result

  • destination doesn't backfill
  • Producing new valid messages goes through

Verdict

⚠️ Beware! Either MV can move the offsets forward. We should always detach / drop the error MVs first!

3: Detaching the error MV

Essentially this should behave just like the case above, but worth double checking.

3a: Detach

  1. Produce one valid message and one error
  2. Detach main MV
  3. Produce one valid message and one error

Result

  • destination gets populated
  • errors doesn't get populated

Verdict

✅ OK

3b: Reattach

  1. Steps from 3a
  2. Reattach the errors MV

Result

  • errors doesn't backfill
  • Producing new messages goes through

Verdict

⚠️ Beware! Either MV can move the offsets forward. We should always detach / drop the error MVs first!

4. Race conditions

4a: Working tables

I suspect this is not a thing but let's test it out.

  1. Use the schemas above, except add the setting kafka_max_block_size=1 to the Kafka table
  2. Create 10 destination tables + MVs following the schemas below
CREATE TABLE race1 
(
	str String,
	offset Int64
)
ENGINE = MergeTree()
ORDER BY offset
CREATE MATERIALIZED VIEW race1_mv TO race1
(
	str String,
	offset Int64
) AS
SELECT
    str,
    _offset AS offset
FROM kafka_destination
  1. Produce a bunch of events to the topic
  2. Drop the Kafka table
  3. Check the offsets on each table

Result

The tables were always in sync, despite producing tens of thousands of events multiple times, telling the ClickHouse Kafka consumer to consume one message at a time, dropping the table at random times, etc.

This is not very thorough testing but seems to confirm the view of the world developed through reading source code and documentation.

Verdict

✅ OK - tables likely don't go out of sync by themselves

4b

Let's do the same as above but introduce a broken MV to see how things behave.

CREATE TABLE race_broken
(
	str String,
	offset Int64
)
ENGINE = MergeTree()
ORDER BY offset
CREATE MATERIALIZED VIEW race_broken_mv TO race1
(
	str String,
	offset Int64
) AS
SELECT
    str,
    nonexistentcolumn
FROM kafka_destination

Result

Same as 4a. The broken MV gets ignored, ClickHouse doesn't even log it.

Verdict

✅ OK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment