Skip to content

Instantly share code, notes, and snippets.

@cyberycon
Last active October 7, 2020 07:13
Show Gist options
  • Save cyberycon/7eeddb5b271d0e996523259b9b3744be to your computer and use it in GitHub Desktop.
Save cyberycon/7eeddb5b271d0e996523259b9b3744be to your computer and use it in GitHub Desktop.
title abstract keywords weblogName postStatus postDate
Kafka for Developers - Part 1
Install Kafka on your dev machine, and check that it is working.
Kafka, Developers, Spring Boot
Medium
draft
2020-10-04 15:44:56 +0100

Kafka for Developers

I wrote this blog mainly as a way for me to organise my understanding of Kafka, and to do it in a logical step-by-step way. It explains how you can install a local Kafka broker for application development, and how to use Kafka in Spring Boot applications. If I don't run out of enthusiasm, I plan to cover over several articles:

  • Installing Kafka on your development machine
  • Producers, partitions, consumers
  • Testing with Embedded Kafka (using Spring Boot)
  • Serialization and deserialisation
  • Authentication and encryption with TLS

In this first article we'll install Kafka, and we'll run a single test to verify that it is installed and that we can produce and receive events.

These articles will not teach you how to run a Kafka cluster suitable for production. They also don't go into detail about whether Kafka is right for your application. Although message queues and Kafka event streams have some overlapping functionality they are not the same. Some use cases are better served by a message queue, and some are better served by Kafka. There's a good explanation of the differences between message queues and event streams here: RabbitMQ vs Kafka.

Still here? Let's get started.

Running Kafka Locally

A production installation of Kafka will include a minimum of three message brokers and Apache zookeeper. But for developing and testing an application on your local machine, you can get by with just Zookeeper and a broker.

Rather than installing directly to an OS, you can use Docker to pull down container images and run them. This has the great advantage that the instructions here work whether you are on Windows, MacOS or Linux. You'll need Docker Desktop if you are on Windows or MacOS, or Docker if you are on Linux. If you are on Redhat Linux 8.0 or later, install Podman to download and run Docker images.

If you are running Windows 10 Home, you need to install Windows Subsystem for Linux 2 (WSL 2) before you install Docker Desktop. e

Running Docker images also makes cleaning up much easier. Delete the images and you've uninstalled everything cleanly. If you do a lot of playing around and prototyping and just generally trying things out, it's very easy to remove everything and start again with a clean slate.

Once you have Docker installed, we can run Kafka using the bitnami/kafka image. Clone github repo https://github.com/cyberycon/kafka-howto to get all the example code for these articles.

The 00-docker directory contains a docker-compose.yml file, adapted from one in the bitnami-docker-kafka github repo. This file will start a single broker and zookeeper instance. Treat this as a starting point to customise the installation for your needs. The file looks like this:

version: '2'

services:
  zookeeper:
    image: 'docker.io/bitnami/zookeeper:3-debian-10'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'docker.io/bitnami/kafka:2-debian-10'
    ports:
      - '9092:9092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

To start your local Kafka instance running, run the following command in the same directory as docker-compose.yml:

docker-compose up -d

The -d option starts the containers detached from the command terminal. This enables you to carry on using the terminal, but you won't see any output from the container processes - so you won't see warnings or errors during startup. You can either omit the -d - in which case all the output from the containers' stdout and stderr is sent straight to your console - or you can use

docker logs bitnami-kafka_kafka_1

to display the logs from the Kafka container on the console.

These settings make it as simple as possible to start developing code with Kafka:

  • Topics are auto-created as soon as referenced by a producer
  • All communications are plain text (no encryption)
  • Broker and Zookeeper are exposed on your localhost with their normal default ports (9092 and 2181 respectively)
  • All data is held on the Docker container local volumes - stop the containers and the data is gone.

These settings are good for development, but not for production. Data and messages are insecure and easily accessed by anyone in a set up like this.

Verify the installation is working

Build the code under 01-installation-test:

mvn package

This project doesn't have an application, but it has a single test which attempts to send and receive some messages using the broker we just installed. If you see a Build Success message, everything is working.

The verification project

I generated the verification project using the Spring Initializr. I set the Java level to 8.0, and the only dependency I added was for Kafka. The dependencies element in pom.xml looks like this:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
      <exclusion>
        <groupId>org.junit.vintage</groupId>
        <artifactId>junit-vintage-engine</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
  </dependency>
</dependencies>

The source code for the test is below.

package com.example.cyberycon.kafkaconnectivity;

// imports omitted from listing

class KafkaConnectivityApplicationTests {
    
    Logger logger = LoggerFactory.getLogger(KafkaConnectivityApplicationTests.class); 

    private final static String TEST_TOPIC = "topic-1"; 
    
    @Test
    public void testSendAndReceive() throws Exception {
        KafkaTemplate<Integer, String> template = createTemplate();
        template.setDefaultTopic(TEST_TOPIC);
        template.sendDefault(0, "foo");
        template.sendDefault(0, "bar");
        template.flush();
        
        // If the listener starts running before the messages have been sent, the topic // does not exist and the tight loop in the listener prevents the topic from 
        // getting created . 
        Thread.sleep(1000);
        
        ContainerProperties containerProps = new ContainerProperties(TEST_TOPIC);
        final CountDownLatch latch = new CountDownLatch(2);
        containerProps.setMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> message) {
                logger.info("received: " + message);
                latch.countDown();
            }

        });
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("testAuto");
        container.start();
        assertTrue(latch.await(60, TimeUnit.SECONDS));
        container.stop();
        logger.info("Stop auto");

    }

    private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
        KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
                containerProps);
        return container;
    }

    private KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        return template;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

This class contains a single test and some helper methods. The test does the following:

  1. Creates a KafkaTemplate (a Spring Framework abstraction for producing messages).
  2. Sends two messages to the default topic, "topic-1". This has the effect of creating the topic if it doesn't already exist (that is how we configured the Kafka broker in the previous section).
  3. Creates a CountDownLatch with a count of 2.
  4. Defines a listener on "topic-1". The onMessage() method is called each time the listener receives a new message, and invokes countDown() on the latch.
  5. Defines a listener container, adds the listener to it, and starts the container. Starting the container means the listener connects to the topic and starts waiting for messages.
  6. The latch.await(60, TimeUnit.SECONDS) inside the assert will wait until the latch has either counted down to zero, or till 60 seconds have passed. If the latch hasn't counted down in this time, the await method returns false and the test fails.

This code looks very simple, but the producer (KafkaTemplate) and consumer (MessageListener) are running in different threads. This is why there is a Thread.Sleep() before starting the message listener, and why we use a CountDownLatch to count the messages as they are received.

In the next article, we'll look at how to use partitions and consumer groups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment