Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active September 7, 2024 21:15
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@arshupadhyay
Copy link

arshupadhyay commented Apr 5, 2024

I am getting rebalancing error for two consumers. Do someone know how to solve this.

@A1996KASH
Copy link

I am getting rebalancing error for two consumers. Do someone know how to solve this.

Please share more details with screenshot

https://github.com/A1996KASH/apache-kafka-nodejs
It worked fine for me

@Akshaymk360
Copy link

south location is not visible even if i change my topic name in admin.js

@Akshaymk360
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

its not working for me bro

@Akshaymk360
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

this works .....but why......

should we change this in producer.js and consumer.js as well? or only in admin?

@Axhat
Copy link

Axhat commented May 21, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

Working now! Thanks, Why do you think this was the case?

@skarthik05
Copy link

skarthik05 commented Jul 30, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

Working now! Thanks, Why do you think this was the case?

In general, the topic name should remain the same throughout Kafka

  1. At the moment of creation
  2. When the producer is being produced
  3. At the moment of consuming

@Axhat I hope this helps

@pranayaapmor
Copy link

pranayaapmor commented Aug 22, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js)
is there any solution for this?

@Sonu-Hansda
Copy link

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address

@rohhann12
Copy link

rohhann12 commented Sep 7, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address.

for this you can use -p and expose your machine 's ports

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment