Skip to content

Instantly share code, notes, and snippets.

@bibiboot
Last active October 1, 2016 00:09
Show Gist options
  • Save bibiboot/e96a00f1f558a7c73b556b45c894e357 to your computer and use it in GitHub Desktop.
Save bibiboot/e96a00f1f558a7c73b556b45c894e357 to your computer and use it in GitHub Desktop.
Kafka

[Download Kafka] (https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz)

> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> sbt update
> sbt package

Start single node zookeeper instance

> bin/zookeeper-server-start.sh config/zookeeper.properties

Start kafka server

> bin/kafka-server-start.sh config/server.properties

Create topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Write messages to topic

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This message# 1
This is message #2

Reading messages from topic

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
This message# 1
This is message #2

Setting up a Maven project

Add dependency

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>
    </dependencies>

Create a producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class KafkaMessageProducer {

    public static void main(String[] args) {
        Map<String, Object> config = new HashMap<String, Object>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        int maxMessages = 1000;

        int count = 0;
        while(count < maxMessages) {
            producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
            System.out.println("Message send.."+count);
        }
    }
    
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment