acks = 0 (No acknowledgement) => High throughput, no delivery guaranteed acks = 1 (Leader) acks = 2 (Leaders and followers) => Delivery guaranteed, but low throughput
linger.ms
0 (default) => Throughput, latency
5 => High Throughput, high latency
max.in.flight.requests.per.connection
1 (default) => Throughput, strong ordering guarantee per partition
5 => High Throughput, lose ordering guarantee
batch.size
16384 (kB, default) => Throughput, high memory usage
1029 (kB, default) => lower Throughput, lower memory usage
Consumer offset, newest by default.
Max. number of consumers in a group = number of partitions in a topic
No. of partitions = Desired Throughput / Partition Speed
Partition Speed +- 10 MB/s
e.g:
Desired Throughput = 10 TB/Day = 120 MB/s No. of Partitions = 12
per partition
max.in.flight.requests.per.connection
-
Admin Client
i. Topics: Create, Update, Delete, Subscribe
i1. Access Control List: Create, Update, Delete, Subscribe
iii. Cluster: Subscribe
Solutions:
i. Conduktor ii. Kafka Tool iii. Burrow iv. Kafdrop v. Yahoo CMAK
-
Data Governance
i. Schemas ii. Naming Conventions iii. Encryption iv. Lineage v. Availability vi. Integrity
Solutions:
i. Confluent (confluent.io) ii. Axual (axual.io) iii. Lenses (lenses.io)
Cannot delete records like a database.
Solution:
- Create a new topic.
- Create a Streaming Application to filter the message to the new topoic.
Kafka Streams kSQL
Set the value of the record with null.
-
Define properties
bootstrap.servers=broker-1:9092 key/valye.serializer=StringSerializer acks=all
-
Create Producer
new KafkaProducer<>(properties)
-
Create Record(s)
new ProducerRecord<>(topic, key, value)
-
Send Record(s)
producer.send(record)
-
Define properties
bootstrap.servers=broker-1:9092 key/valye.serializer=StringSerializer group.id=kafka.consumer
-
Create Consumer
new KafkaConsumer<>(properties)
-
Subscribe
consumer.subscribe(topics)
-
Poll
consumer.poll()
-
Define properties
bootstrap.servers=broker-1:9092
-
Create Admin Client
new AdminClient<>(properties)
-
Perform Action
adminClient.createTopic(topic)
Database Table -> Streaming Data
- Log-Based
By monitoring the transaction logs:
MySQL: binlog
PostgresSQL: write-ahead log
MongoDB: op log
- Trigger-Based (Push mechanism)
Create triggers for After Insert, After Update and After Delete.
- Query-Based (Poll mechanism)
The streaming system query the database system by polling, e.g. 100ms.
More: Kafka Connect Fundamentals
enable.auto.commit = false
producer.flush();
consumer.commitSync();
Reason: Not everyone can use Kafka:
-
No client: Programming language that does not have a Kafka Client implementation.
-
Legacy application: Old application that cannot use the Kafka Clients.
Solution:
Create a REST Prxoy between the source system and Kafka.
-
Generate a new ID for your cluster:
${KARKA_HOME}/bin/kafka-storage.sh random-uuid
-
Format the storage directory:
${KARKA_HOME}/bin/kafka-storage.sh format -t <uuid> -c ${KARKA_HOME}/config/kraft/server.properties
-
Launch the broker itself in daemon mode:
${KARKA_HOME}/bin/kafka-server-start.sh ${KARKA_HOME}/config/kraft/server.properties
-
List all topics.
kafka-topics.sh --bootstrap-server localhost:9092 --list
-
Create a topic.
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first_topic
-
Create a topic with 3 partitions.
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second_topic --partitions 3
-
Create a topic with 3 partitions and refactor factor of 2.
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic third_topic --partitions 3 --replication-factor 2
-
List all topics.
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second_topic
-
Create a topic.
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic
-
Send message to topic:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic
-
Send message with keys to topic with:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true key.separator=:
-
Read at the end:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic
-
Read at the beginning:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning
-
Read with formatting:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.timesimestamp=true --property print.key=true --property print.value=true