Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active October 4, 2024 13:18
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@pranayaapmor
Copy link

pranayaapmor commented Aug 22, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js)
is there any solution for this?

@Sonu-Hansda
Copy link

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address

@rohhann12
Copy link

rohhann12 commented Sep 7, 2024

Instead of <PRIVATE_IP> i want to use localhost, but its not working,(changed in kafka container and client.js) is there any solution for this?

When running Kafka in Docker, you can't use localhost or 127.0.0.1 as the value for KAFKA_ZOOKEEPER_CONNECT and KAFKA_ADVERTISED_LISTENERS. This is because localhost and 127.0.0.1 refer to the container's own loopback interface, not the host machine's IP address. So you have to use your own IPv4 address.

for this you can use -p and expose your machine 's ports

@abhishek1savaliya
Copy link

Thanks this is the best tutorial for Kafka and works fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment