Skip to content

Instantly share code, notes, and snippets.

@ghsatpute
Last active December 16, 2020 05:24
Show Gist options
  • Save ghsatpute/e7d13e86e7f1697ff5c265adcdc0a321 to your computer and use it in GitHub Desktop.
Save ghsatpute/e7d13e86e7f1697ff5c265adcdc0a321 to your computer and use it in GitHub Desktop.
Kafka Notes
venv/**
.idea/**

Kafka Notes

Installation on Mac

  1. Install Kafka using HomeBrew
$ brew install kafka

Start Kafka

  1. Start ZooKeeper
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
  1. Start Kafka
$ kafka-server-start /usr/local/etc/kafka/server.properties

Create a Kafka Topic using terminal

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Create a producer using terminal

$ kafka-console-producer --broker-list localhost:9092 --topic test

You'll see something like this

kafka-console-producer --broker-list localhost:9092 --topic test
>1
>2
>3
>4

Each line you type (here, I typed just numbers in sequence) and press enter will be put on to Kafka topic test

Create a consumer using terminal

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

You'll see something like below

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
1
2
3
4

Kafka Python

  1. Install Confluent Python client
    pip install confluent-kafka
    

Concepts

Message

Single unit of data that can be sent or received.

Message schemas

For Kafka any message is just byte array. We can provide additional structure to the message by defining the schema. Few available options are XML, JSON, Avro, Protobuf, etc.

Producer/Publisher

External process or application which produces the message

Consumer/Subscriber

External process or application which consumes the messages

Topic

Topic provides a way to categorizing the data that is being sent.

Partition

Topic can be further broken down in partitions. Each partition acts as a separate commit log. Order of messages is guaranteed only across the same partition. If no partition is specified and there are multiple partitions in given topic messages willb e written in round-robin fashion.

Kafka Batch

Producing and delivering single messages to Kafka creates lot of overhead. Batch iis a collection of messages produced for same topic and partition. Can be compressed.

Consumer Groups

Consumer group consumes a message from a topic. A consumer from consumer group reads a message from a partition within topic. If you want to process same message by two consumers, you need to create separate consumer group.

Message Retention

Kafka by default provides retention of messages based on number of days or size. By default, 7 day is the retention period. TODO: Default size for retention?

In-sync Replicas

A replica is in-sync if it is not more than N messages or T seconds (default 10) behind the leader.

High Water Mark

Kafka Architecture

Components

Broker

A single Kafka server within a cluster is called a Broker

Responsibilities
  • Receiving messages from producers
  • Assigning offsets
  • Committing messages to disk
  • Responding to consumers fetch requests and serving messages

Cluster Controller

One broker from a cluster would act as a "cluster controller". TODO: Cluster controller failure??

Responsibilities
  • Assigning partition to brokers
  • Monitoring for broker failures

Kafka Reliability Guarantees

Ordering guarantee

If two messages are produced by same producer for same partition then Kafka guarantees the order of those two messages.

Kafka Replication

Things to check

  1. Kafka Connect
  2. Kafka Connectors
  3. Kafka Streams

Things to try

  1. Use Kafka connector

Useful Resources

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.0</version>
        </dependency>

    </dependencies>

ProducerClient.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerClient {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producerClient = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producerClient.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            System.out.println("Message produced "+ i);
            Thread.sleep(1000);
        }

        producerClient.close();
    }
}

ConsumerClient.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerClient {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

producer.py

import time

from confluent_kafka import Producer


p = Producer({'bootstrap.servers': 'localhost:9092'})


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """

    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

i = 0
while(True):
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)  # This works as p.flush()

    data = 'Data ' + str(i)
    i = i + 1

    time.sleep(1)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('test', data.encode('utf-8'), callback=delivery_report)

group1_consumer1.py

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: Consumer 1 Group 1: {}'.format(msg.value().decode('utf-8')))

c.close()

group1_consumer2.py

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: Consumer 2 Group 1: {}'.format(msg.value().decode('utf-8')))

c.close()

group2_consumer1.py

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': '``localhost:9092``',
    'group.id': 'mygroup2',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: Consumer 1 Group 2: {}'.format(msg.value().decode('utf-8')))

c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment