Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active September 7, 2024 21:15
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@bhumit070
Copy link

bhumit070 commented Mar 12, 2024

If anyone need docker compose file

version: "3"

services:
    zookeeper:
        image: zookeeper
        container_name: zookeeper
        ports:
            - "2181:2181"

    kafka:
        image: confluentinc/cp-kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
        expose:
            - "29092"
        environment:
            KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
            KAFKA_MIN_INSYNC_REPLICAS: "1"

    kafka-ui:
        container_name: kafka-ui
        image: provectuslabs/kafka-ui
        ports:
            - 8080:8080
        environment:
            DYNAMIC_CONFIG_ENABLED: true

Also if you are getting error that port received NaN then type URL like this localhost:9092 instead of http://localhost:9092

Edit: add kafka UI

@Sagar-Puniyani
Copy link

@bhumit070 good work for creating decompose file

@jalakpatoliya
Copy link

Unable to find image ':2181' locally i got this error

I guess you forgot to use zookeeper at the end of it?
This is full line: docker run -p 2181:2181 zookeeper

@jalakpatoliya
Copy link

jalakpatoliya commented Mar 20, 2024

@bhumit070

If anyone need docker compose file

version: '3'

services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Also if you are getting error that port received NaN then type URL like this localhost:9092 instead of http://localhost:9092

This is working like a charm, Thanks bro 😊

@jalakpatoliya
Copy link

jalakpatoliya commented Mar 20, 2024

docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

I tried your solution still getting error, below is what I used:

sudo docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=103.171.98.61:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://103.171.98.61:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

I'm having linux mint, and used curl ifconfig to get the ip.

Error:

java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1289)
[2024-03-20 13:32:49,380] INFO Opening socket connection to server 103.171.98.61/103.171.98.61:2181. (org.apache.zookeeper.ClientCnxn)
[2024-03-20 13:32:49,381] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2024-03-20 13:32:49,387] WARN Session 0x0 for server 103.171.98.61/103.171.98.61:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)

@A1996KASH
Copy link

A1996KASH commented Mar 22, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

* in admin.js file, update the topic name

* run admin.js again

* run producer and consumer again it worked for me

It worked for me as well

It worked well, Admin server need to be updated with new topic name else it won;t work

Does anyone knows the reason?

@dr4g0n7ly
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

@dr4g0n7ly
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

No. I don't think so. Try it and share the git repo I will check and try to help.

@arshupadhyay
Copy link

arshupadhyay commented Apr 5, 2024

I am getting rebalancing error for two consumers. Do someone know how to solve this.

@A1996KASH
Copy link

I am getting rebalancing error for two consumers. Do someone know how to solve this.

Please share more details with screenshot

https://github.com/A1996KASH/apache-kafka-nodejs
It worked fine for me

@Akshaymk360
Copy link

south location is not visible even if i change my topic name in admin.js

@Akshaymk360
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

its not working for me bro

@Akshaymk360
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

this works .....but why......

should we change this in producer.js and consumer.js as well? or only in admin?

@Axhat
Copy link

Axhat commented May 21, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

Working now! Thanks, Why do you think this was the case?

@skarthik05
Copy link

skarthik05 commented Jul 30, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

Working now! Thanks, Why do you think this was the case?

In general, the topic name should remain the same throughout Kafka

  1. At the moment of creation
  2. When the producer is being produced
  3. At the moment of consuming

@Axhat I hope this helps

@pranayaapmor
Copy link

pranayaapmor commented Aug 22, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js)
is there any solution for this?

@Sonu-Hansda
Copy link

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address

@rohhann12
Copy link

rohhann12 commented Sep 7, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address.

for this you can use -p and expose your machine 's ports

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment