Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active December 3, 2023 13:26
  • Star 39 You must be signed in to star a gist
  • Fork 30 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@Riyazkhan1989
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

@mdalishanali
Copy link

you are doing typo

@Riyazkhan1989
Copy link

Just copy paste everything

@Saibejjani
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

@Shailendra-Vishwakarma
Copy link

Thanks @piyushgarg-dev for amazing tutorial. successfully deployed it on my local machine

@legcy143
Copy link

legcy143 commented Sep 2, 2023

Unable to find image ':2181' locally i got this error

@gunjan-it-engg
Copy link

Every Thing Is Working Successfully !
Great Explain Brother , Very Helpful Explain Ever !
Thank You So Much!

@LEO1612D
Copy link

LEO1612D commented Sep 4, 2023

I am not able to generate and see SOUTH location. Its showing only NORTH

I also had this same issue, what worked for me is

  • in admin.js file, update the topic name
  • run admin.js again
  • run producer and consumer again it worked for me

@blacksmoke26
Copy link

In case of docker troubleshooting:

docker run -d --name zookeeper -p 2181:2181 jplock/zookeeper
docker run -d --name kafka -p 7203:7203 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=10.4.1.29 -e ZOOKEEPER_IP=10.4.1.29 ches/kafk

@itishprasad30
Copy link

it only generation for the north , but south location not generation

@mohammad-quanit
Copy link

mohammad-quanit commented Oct 15, 2023

The Docker command fails when running Kafka from it. It is somehow unable to reach with zookeeper and I am getting the below provided error. Is anyone facing the same issue? Let me know.

Errors:  KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED MY_PRIVATE_IP:9092
    at Socket.onError (/home/mquanit/Desktop/Projects/kafka/nodejs-kafka/node_modules/kafkajs/src/network/connection.js:210:23)
    ... 3 lines matching cause stack trace ...
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  retryCount: 5,
  retryTime: 11306,
  [cause]: KafkaJSConnectionError: Connection error: connect ECONNREFUSED MY_PRIVATE_IP:9092
      at Socket.onError (/home/mquanit/Desktop/Projects/kafka/nodejs-kafka/node_modules/kafkajs/src/network/connection.js:210:23)
      at Socket.emit (node:events:513:28)
      at emitErrorNT (node:internal/streams/destroy:151:8)
      at emitErrorCloseNT (node:internal/streams/destroy:116:3)
      at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
    retriable: true,
    helpUrl: undefined,
    broker: 'MY_PRIVATE_IP:9092',
    code: 'ECONNREFUSED',
    [cause]: undefined
  }
}

@navneetgzb
Copy link

Getting Error while running command : INFO SASL config status: Will not attempt to authenticate using SASL (unknown error). sudo docker run -p 9092:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ confluentinc/cp-kafka Please help me on this

@Subhanshu-2411
Copy link

Getting Error while running command : INFO SASL config status: Will not attempt to authenticate using SASL (unknown error). sudo docker run -p 9092:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ confluentinc/cp-kafka Please help me on this

Remove \ and use the complete command in one line

docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.43.70:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.70:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

@Subhanshu-2411
Copy link

The Docker command fails when running Kafka from it. It is somehow unable to reach with zookeeper and I am getting the below provided error. Is anyone facing the same issue? Let me know.

Errors:  KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED MY_PRIVATE_IP:9092
    at Socket.onError (/home/mquanit/Desktop/Projects/kafka/nodejs-kafka/node_modules/kafkajs/src/network/connection.js:210:23)
    ... 3 lines matching cause stack trace ...
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  retryCount: 5,
  retryTime: 11306,
  [cause]: KafkaJSConnectionError: Connection error: connect ECONNREFUSED MY_PRIVATE_IP:9092
      at Socket.onError (/home/mquanit/Desktop/Projects/kafka/nodejs-kafka/node_modules/kafkajs/src/network/connection.js:210:23)
      at Socket.emit (node:events:513:28)
      at emitErrorNT (node:internal/streams/destroy:151:8)
      at emitErrorCloseNT (node:internal/streams/destroy:116:3)
      at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
    retriable: true,
    helpUrl: undefined,
    broker: 'MY_PRIVATE_IP:9092',
    code: 'ECONNREFUSED',
    [cause]: undefined
  }
}

Update Private_IP variable with your current IP Address using 'ifconfig' or 'ipconfig'

@DeepanshuGupta8990
Copy link

I am not able to generate and see SOUTH location. Its showing only NORTH

same issue

@DeepanshuGupta8990
Copy link

change the topic to something else like rider-status, I also had the same issue. i think it will keep the whatever code we deployed first so just change the topic. I hope it will work as intended.

this works .....but why......

@AnkitRanyal
Copy link

please make a video on the implementation of long polling and push

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment