Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active May 24, 2024 08:31
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

@dr4g0n7ly
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

@A1996KASH
Copy link

I need to run the consumer and producer on different servers. Can someone let me know what changes I should implement to set that up?

Yes. Simply run producer file onto a different server you will have to copy client.js also on to another server

Thank you for replying but I suspect I will have issues with the docker files and with establishing a connection from the producer to the consumer. Should I be worried about that?

No. I don't think so. Try it and share the git repo I will check and try to help.

@arshupadhyay
Copy link

arshupadhyay commented Apr 5, 2024

I am getting rebalancing error for two consumers. Do someone know how to solve this.

@A1996KASH
Copy link

I am getting rebalancing error for two consumers. Do someone know how to solve this.

Please share more details with screenshot

https://github.com/A1996KASH/apache-kafka-nodejs
It worked fine for me

@Akshaymk360
Copy link

south location is not visible even if i change my topic name in admin.js

@Akshaymk360
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

its not working for me bro

@Akshaymk360
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

this works .....but why......

should we change this in producer.js and consumer.js as well? or only in admin?

@Axhat
Copy link

Axhat commented May 21, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

Working now! Thanks, Why do you think this was the case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment