Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active April 18, 2024 04:56
Show Gist options
  • Star 74 You must be signed in to star a gist
  • Fork 64 You must be signed in to fork a gist
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@Ankit-Yadav-21
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

This worked for me.

@Yash3456
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

* in admin.js file, update the topic name

* run admin.js again

* run producer and consumer again it worked for me

It worked for me as well

@Yash3456
Copy link

It worked well, Admin server need to be updated with new topic name else it won;t work

@crest-harikrushn
Copy link

Getting Error while running command : INFO SASL config status: Will not attempt to authenticate using SASL (unknown error). sudo docker run -p 9092:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ confluentinc/cp-kafka Please help me on this

Remove \ and use the complete command in one line

docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

This worked for me. Thanks 👍 😎

@bhumit070
Copy link

bhumit070 commented Mar 12, 2024

If anyone need docker compose file

version: "3"

services:
    zookeeper:
        image: zookeeper
        container_name: zookeeper
        ports:
            - "2181:2181"

    kafka:
        image: confluentinc/cp-kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
        expose:
            - "29092"
        environment:
            KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
            KAFKA_MIN_INSYNC_REPLICAS: "1"

    kafka-ui:
        container_name: kafka-ui
        image: provectuslabs/kafka-ui
        ports:
            - 8080:8080
        environment:
            DYNAMIC_CONFIG_ENABLED: true

Also if you are getting error that port received NaN then type URL like this localhost:9092 instead of http://localhost:9092

Edit: add kafka UI

@Sagar-Puniyani
Copy link

@bhumit070 good work for creating decompose file

@jalakpatoliya
Copy link

Unable to find image ':2181' locally i got this error

I guess you forgot to use zookeeper at the end of it?
This is full line: docker run -p 2181:2181 zookeeper

@jalakpatoliya
Copy link

jalakpatoliya commented Mar 20, 2024

@bhumit070

If anyone need docker compose file

version: '3'

services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Also if you are getting error that port received NaN then type URL like this localhost:9092 instead of http://localhost:9092

This is working like a charm, Thanks bro 😊

@jalakpatoliya
Copy link

jalakpatoliya commented Mar 20, 2024

docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

I tried your solution still getting error, below is what I used:

sudo docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=103.171.98.61:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://103.171.98.61:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

I'm having linux mint, and used curl ifconfig to get the ip.

Error:

java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1289)
[2024-03-20 13:32:49,380] INFO Opening socket connection to server 103.171.98.61/103.171.98.61:2181. (org.apache.zookeeper.ClientCnxn)
[2024-03-20 13:32:49,381] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2024-03-20 13:32:49,387] WARN Session 0x0 for server 103.171.98.61/103.171.98.61:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)

@A1996KASH
Copy link

A1996KASH commented Mar 22, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

* in admin.js file, update the topic name

* run admin.js again

* run producer and consumer again it worked for me

It worked for me as well

It worked well, Admin server need to be updated with new topic name else it won;t work

Does anyone knows the reason?

@dr4g0n7ly
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

@dr4g0n7ly
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

No. I don't think so. Try it and share the git repo I will check and try to help.

@arshupadhyay
Copy link

arshupadhyay commented Apr 5, 2024

I am getting rebalancing error for two consumers. Do someone know how to solve this.

@A1996KASH
Copy link

I am getting rebalancing error for two consumers. Do someone know how to solve this.

Please share more details with screenshot

https://github.com/A1996KASH/apache-kafka-nodejs
It worked fine for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment