Skip to content

Instantly share code, notes, and snippets.

@sokil
Last active March 13, 2023 10:11
Show Gist options
  • Save sokil/0a8bd384bf1e1972c11df8053e0b1976 to your computer and use it in GitHub Desktop.
Save sokil/0a8bd384bf1e1972c11df8053e0b1976 to your computer and use it in GitHub Desktop.
Kafka Connect

ClickHouse

Events from Kafka topic may be consumed by ClickHouse and stored. This may be useful for making projections from events.

ClickHouse's nodes must be added to cluster to use ReplicatedMergeTree.

First, create database my_db.

ClickHouse cluster

Check configured clusters:

select * from system.clusters;

Lats deal that it's name is my_cluster.

Schema

ClickHouse must be configured to use schema registry.

In this example we stream some user's balance operations to Kafka topic, that match the schema userBalanceOperation-value:

{
  "type": "record",
  "name": "userBalanceOperation",
  "namespace": "some-namespace",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "userId",
      "type": "bytes"
    },
    {
      "name": "operationId",
      "type": "long"
    },
    {
      "name": "amount",
      "type": "long"
    },
    {
      "name": "createdAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Kafka consumer table

Kafka events consumed using special table with Engine=Kafka:

CREATE TABLE my_db.ke_user_balance_operations ON CLUSTER my_cluster
(
    `id`          UInt64,
    `operationId` UInt32,
    `userId`      String,
    `createdAt`   DateTime,
    `amount`      Int64
)
ENGINE = Kafka 
SETTINGS
    kafka_broker_list = 'some-kafka-broker-host:9093', 
    kafka_topic_list = 'userBalanceOperation', 
    kafka_group_name = 'clickhouse-userBalanceOperation', 
    kafka_format = 'AvroConfluent';

Replicated storage for events

Events will be stored to ReplicatedMergeTree:

CREATE TABLE my_db.tb_user_balance_operations ON CLUSTER my_cluster
(
    `id`          UInt64,
    `operationId` UInt32,
    `userId`      String,
    `createdAt`   DateTime,
    `amount`      Int64
)
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/my_db.ke_user_balance_operations/shard_{shard}', '{replica}')
ORDER BY (id, createdAt)
SETTINGS index_granularity = 8192;

Materialized view

Consumed events from kafka's my_db.ke_user_balance_operations stored to replicated table my_db.tb_user_balance_operations through materialised view my_db.mv_user_balance_operations.

CREATE MATERIALIZED VIEW my_db.mv_user_balance_operations ON CLUSTER my_cluster TO my_db.tb_user_balance_operations(
    `id` UInt64,
    `operationId` UInt32,
    `userId` String,
    `createdAt` DateTime,
    `amount` String
) 
    AS SELECT `id`, `operationId`, `userId`, `createdAt`, `amount` 
    FROM my_db.ke_user_balance_operations;

Dropping old tables:

DROP TABLE my_db.mv_user_balance_operations ON CLUSTER my_cluster SYNC;
DROP TABLE my_db.tb_user_balance_operations ON CLUSTER my_cluster SYNC; 
DROP TABLE my_db.ke_user_balance_operations ON CLUSTER my_cluster SYNC; 

If by some reasons we require reading data from beginning, we need to stop consuming events from Kafka using DETACH TABLE, setting offset of consumer to 0 and then reattach table:

DETACH TABLE ke_user_balance_operations on cluster my_cluster; 
/opt/kafka/bin/kafka-consumer-groups.sh \
    --bootstrap-server my_csome-kafka-broker-host:9093 \
    --command-config command.properties \
    --group clickhouse-userBalanceOperation \
    --topic userBalanceOperation \
    --reset-offsets --to-earliest \
    --execute 

Config:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="SOME_USER" password="SOME-PASSWORD";
ATTACH TABLE ke_user_balance_operations on cluster my_cluster;

Troubleshooting

Insert to multiple partitions:

void DB::StorageKafka::threadFunc(size_t): Code: 252, e.displayText() = DB::Exception: Too many partitions for single 
INSERT block (more than 100). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number 
of partitions is a common misconception. It will lead to severe negative performance impact, including slow server 
startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 
1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient 
to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).: while write prefix 
to view my_db.mv_user_balance_operations (af6e3c05-163a-4b04-af6e-3c05163aeb04), Stack trace (when copying this 
message, always include the lines below)

Get current settings:

select  * from system.settings where name ='max_partitions_per_insert_block' \G

Consuming DECIMAL values

Decimal values stored to Kafka as bytes.

Field declaration in Avro:

"name": "amount",
"type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "logicalType": "decimal"
    }
]

ClickHouse's consumer than can not decode this field to ClickHouse's decimal, so currently there is no way to use decimals when consume Kafka events to ClickHouse.

Useful links:

Moving events from MySQL to Kafka

Kafka Connect in source mode allows simple producing of events from different sources like databases.

Let's produce events from MySQL table:

CREATE TABLE `userBalanceOperation` (
    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `userId` binary(16) NOT NULL,
    `operationId` int(10) unsigned NOT NULL,
    `amount` decimal(10,2) NOT NULL,
    `createdAt` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    KEY `userId` (`userId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Installation

Kafka must be configured to use JDBC sources:

Plugins

Plugins dir may be configured in connector's worker config parameter plugin.path.

For io.confluent.connect.jdbc.JdbcSourceConnector: https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

For io.confluent.connect.avro.AvroConverter: https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter

Configuring MySQL Adapter

Mysql JDBC Adapter: https://downloads.mysql.com/archives/c-j/

This file must be placed inside lib directory of kafka-connect-jdbc plugin.

Running worker

LOG_DIR=/var/log ./connect-standalone.sh worker.properties connector.properties

We need to pass two configs for worker and connector.

Worker config worker.properties:

bootstrap.servers=some-kafka-broker-host:9093

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=SCRAM-SHA-256
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="SOME_USER" password="SOME_PASSWORD";

plugin.path=/usr/local/share/kafka/plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://some-schema-registry-host:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
value.converter.auto.register.schemas=false
#value.converter.use.latest.version=true
#value.converter.latest.compatibility.strict=false
offset.storage.file.filename=/tmp/connect.offsets

Connector config connector.properties:

name=UserBalanceOperationConnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
topic.prefix=userBalanceOperation
connection.url=jdbc:mysql://some-mysql-host:4306/someDbName
connection.user=SOME-USER
connection.password=SOME-PASS
mode=incrementing
incrementing.column.name=id
#table.whitelist=userBalanceOperation
errors.log.enable=true
errors.log.include.messages=true
tasks.max=1
poll.interval.ms=100000
offset.flush.interval.ms=100000
query=SELECT id, userId, operationId, FLOOR(amount * 100) as amount, UNIX_TIMESTAMP(createdAt) as createdAt FROM userBalanceOperation

Running worker starts REST server, so we may obtain its status:

curl -s "http://localhost:8083/connectors/UserBalanceOperationConnector"

If query not passed, Kafka Connect will generate it automatically. This query must be without WHERE clause because Kafka Connect need to it by itself.

If query not passed, Kafka Connect will produce all tables of database, but using table.whitelist property we may filter tables.

Schema

Kafka Connect may create schema on registry automatically, but in some cases we require creating own schema and disable automatic schema creation. This may be done in worker config:

value.converter.auto.register.schemas=false

userBalanceOperation-value

{
  "type": "record",
  "name": "userBalanceOperation",
  "namespace": "some-namespace",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "userId",
      "type": "bytes"
    },
    {
      "name": "operationId",
      "type": "long"
    },
    {
      "name": "amount",
      "type": "long"
    },
    {
      "name": "createdAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Permanently delete subject from schema registry:

curl -XDELETE http://some-schema-registry-host:8081/subjects/userBalanceOperation-value?permanent=true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment