Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active June 22, 2025 09:16
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@ssk090
Copy link

ssk090 commented Nov 14, 2024

Here is the same kafka implementation using typescript : https://gist.github.com/ssk090/852139926f7157ff61baccd40e900d0c

@idabora
Copy link

idabora commented Nov 19, 2024

I am not able to generate and see SOUTH location. Its showing only NORTH

  • change the topic from admin.js consumer.js and producer.js
  • run admin.js (node admin.js)
    ** Then the partitioner will be form and assign as it should be **

@ashpeak
Copy link

ashpeak commented Jan 11, 2025

please make a video on the implementation of long polling and push

Paid courses for them, No one will teach everything for free.

@ANKIT3412
Copy link

[cause]: KafkaJSConnectionError: Connection timeout
at Timeout.onTimeout [as _onTimeout] (C:\Users\Dell\Desktop\kafka-app\node_modules\kafkajs\src\network\connection.js:223:23)
at listOnTimeout (node:internal/timers:594:17)
at process.processTimers (node:internal/timers:529:7) {
retriable: true,
helpUrl: undefined,
broker: '192.168.0.204:9092',
code: undefined,
[cause]: undefined

while running any js file this same error is showing. kindly help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment