Skip to content

Instantly share code, notes, and snippets.

@adafycheng
Last active April 26, 2022 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adafycheng/bbe8ab16fb58a6486774ba46dcf6c7dc to your computer and use it in GitHub Desktop.
Save adafycheng/bbe8ab16fb58a6486774ba46dcf6c7dc to your computer and use it in GitHub Desktop.
Kafka Notes

Kafka Notes

Concepts

Producing data to Kafka

acks = 0 (No acknowledgement) => High throughput, no delivery guaranteed acks = 1 (Leader) acks = 2 (Leaders and followers) => Delivery guaranteed, but low throughput

linger.ms

0 (default) => Throughput, latency

5 => High Throughput, high latency

max.in.flight.requests.per.connection

1 (default) => Throughput, strong ordering guarantee per partition

5 => High Throughput, lose ordering guarantee

batch.size

16384 (kB, default) => Throughput, high memory usage

1029 (kB, default) => lower Throughput, lower memory usage

Consuming Data from Kafka

Consumer offset, newest by default.

Max. number of consumers in a group = number of partitions in a topic

No. of partitions = Desired Throughput / Partition Speed

Partition Speed +- 10 MB/s

e.g:

Desired Throughput = 10 TB/Day = 120 MB/s No. of Partitions = 12

Message Ordering

per partition

max.in.flight.requests.per.connection

Managing Data Stream

  1. Admin Client

    i. Topics: Create, Update, Delete, Subscribe

    i1. Access Control List: Create, Update, Delete, Subscribe

    iii. Cluster: Subscribe

Solutions:

i. Conduktor ii. Kafka Tool iii. Burrow iv. Kafdrop v. Yahoo CMAK

  1. Data Governance

    i. Schemas ii. Naming Conventions iii. Encryption iv. Lineage v. Availability vi. Integrity

Solutions:

i. Confluent (confluent.io) ii. Axual (axual.io) iii. Lenses (lenses.io)

Deleting records

Cannot delete records like a database.

Solution:

  1. Create a new topic.
  2. Create a Streaming Application to filter the message to the new topoic.

Kafka Streams kSQL

Deleting records on Compact Topic

Set the value of the record with null.

Setup Kafka Producer

  1. Define properties

    bootstrap.servers=broker-1:9092 key/valye.serializer=StringSerializer acks=all

  2. Create Producer

    new KafkaProducer<>(properties)

  3. Create Record(s)

    new ProducerRecord<>(topic, key, value)

  4. Send Record(s)

    producer.send(record)

Setup Kafka Consumer

  1. Define properties

    bootstrap.servers=broker-1:9092 key/valye.serializer=StringSerializer group.id=kafka.consumer

  2. Create Consumer

    new KafkaConsumer<>(properties)

  3. Subscribe

    consumer.subscribe(topics)

  4. Poll

    consumer.poll()

Setup Admin Client

  1. Define properties

    bootstrap.servers=broker-1:9092

  2. Create Admin Client

    new AdminClient<>(properties)

  3. Perform Action

    adminClient.createTopic(topic)

Transforming non-streaming models

Database Table -> Streaming Data

Change Data Capture

  1. Log-Based

By monitoring the transaction logs:

MySQL: binlog

PostgresSQL: write-ahead log

MongoDB: op log

  1. Trigger-Based (Push mechanism)

Create triggers for After Insert, After Update and After Delete.

  1. Query-Based (Poll mechanism)

The streaming system query the database system by polling, e.g. 100ms.

More: Kafka Connect Fundamentals

Synchronous Producing and Consuming

enable.auto.commit = false

producer.flush();

consumer.commitSync();

REST

Reason: Not everyone can use Kafka:

  1. No client: Programming language that does not have a Kafka Client implementation.

  2. Legacy application: Old application that cannot use the Kafka Clients.

Solution:

Create a REST Prxoy between the source system and Kafka.

Start Kafka using Kraft

  1. Generate a new ID for your cluster:

    ${KARKA_HOME}/bin/kafka-storage.sh random-uuid

  2. Format the storage directory:

    ${KARKA_HOME}/bin/kafka-storage.sh format -t <uuid> -c ${KARKA_HOME}/config/kraft/server.properties

  3. Launch the broker itself in daemon mode:

    ${KARKA_HOME}/bin/kafka-server-start.sh ${KARKA_HOME}/config/kraft/server.properties

  4. Kafka with KRaft (without Zookeeper) on Linux

Kafka Topics CLI

  1. List all topics.

    kafka-topics.sh --bootstrap-server localhost:9092 --list

  2. Create a topic.

    kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first_topic

  3. Create a topic with 3 partitions.

    kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second_topic --partitions 3

  4. Create a topic with 3 partitions and refactor factor of 2.

    kafka-topics.sh --bootstrap-server localhost:9092 --create --topic third_topic --partitions 3 --replication-factor 2

  5. List all topics.

    kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second_topic

  6. Create a topic.

    kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic

Karfka Console Producer CLI

  1. Send message to topic:

    kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic

  2. Send message with keys to topic with:

    kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true key.separator=:

Karfka Console Consumer CLI

  1. Read at the end:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic

  2. Read at the beginning:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning

  3. Read with formatting:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.timesimestamp=true --property print.key=true --property print.value=true

Acknowledgement

  1. Handling Streaming Data with a Kafka Cluster
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment