Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active February 21, 2024 06:37
  • Star 62 You must be signed in to star a gist
  • Fork 54 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@AnkitRanyal
Copy link

please make a video on the implementation of long polling and push

@basantojha1
Copy link

Very Helpful for Starters.. Thankyou.

@ChaitanyaBinmile
Copy link

@DeepanshuGupta8990 error related to only one partition being used
confirm this code snippet in your producer.js ,it works for me (mine had scope issue)
messages: [
{
partition: location.toLowerCase() === "north" ? 0 : 1,
key: "location-update",
value: JSON.stringify({ name: riderName, location }),
},
],

@Ankit-Yadav-21
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

This worked for me.

@Yash3456
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

* in admin.js file, update the topic name

* run admin.js again

* run producer and consumer again it worked for me

It worked for me as well

@Yash3456
Copy link

It worked well, Admin server need to be updated with new topic name else it won;t work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment