Skip to content

Instantly share code, notes, and snippets.

@mkocsar
Last active August 18, 2023 11:02
Show Gist options
  • Save mkocsar/b58191139784d9244b62b33b13dd4ab8 to your computer and use it in GitHub Desktop.
Save mkocsar/b58191139784d9244b62b33b13dd4ab8 to your computer and use it in GitHub Desktop.
KafkaJS send duration measurement with thousands of topics
import { hrtime } from 'node:process';
import { Kafka } from 'kafkajs';
// ---
const CLUSTER_URL = 'localhost:9092';
const TOPIC_COUNT = 3500;
const TPS = 25;
const REPORT_WINDOW_S = 1;
const TEST_DURATION_S = 60;
// ---
const kafka = new Kafka({
clientId: 'mkocsar-kafkajs-test',
brokers: [ CLUSTER_URL ]
});
const producer = kafka.producer({
allowAutoTopicCreation: false
});
await producer.connect();
let nextTopicIndex = 0;
const sendEvent = async () => {
const topic = `mkocsar.kafkajs.test-${nextTopicIndex}`;
nextTopicIndex = (nextTopicIndex + 1) % TOPIC_COUNT;
await producer.send({
topic,
messages: [{
value: 'dummy-value'
}]
});
};
let reportWindowEventCount = 0;
const measuredDurationCount = TPS * REPORT_WINDOW_S;
let measuredDurations = [];
let measuredDurationsSum = 0;
const sendEventMeasured = async () => {
const start = hrtime.bigint();
await sendEvent();
const measuredDuration = Number(hrtime.bigint() - start) / 1_000_000;
++reportWindowEventCount;
measuredDurations.push(measuredDuration);
measuredDurationsSum += measuredDuration;
if (measuredDurations.length > measuredDurationCount) {
measuredDurationsSum -= measuredDurations.shift();
}
}
const report = () => {
const measuredDurationAvg = (measuredDurationsSum / measuredDurations.length).toFixed(3);
console.log(`Window: ${REPORT_WINDOW_S} s ; Events: ${reportWindowEventCount} ; Average 'send' duration: ${measuredDurationAvg} ms`)
reportWindowEventCount = 0;
};
// Warm-up
for (let index = 0; index < TOPIC_COUNT; ++index) {
await sendEvent();
}
const sendEventDelayMs = 1000 / TPS;
const sendEventIntervalId = setInterval(sendEventMeasured, sendEventDelayMs);
const reportIntervalId = setInterval(report, REPORT_WINDOW_S * 1000);
const shutdown = async () => {
clearInterval(sendEventIntervalId);
clearInterval(reportIntervalId);
await producer.disconnect();
};
setTimeout(shutdown, TEST_DURATION_S * 1000);
import { Kafka } from 'kafkajs';
// ---
const CLUSTER_URL = 'localhost:9092';
const TOPIC_COUNT = 3500;
// ---
const kafka = new Kafka({
clientId: 'mkocsar-kafkajs-test',
brokers: [ CLUSTER_URL ]
});
const admin = kafka.admin();
await admin.connect();
for (let index = 0; index < TOPIC_COUNT; ++index) {
const topic = `mkocsar.kafkajs.test-${index}`;
await admin.createTopics({
topics: [{
topic,
numPartitions: 2,
replicationFactor: 1
}]
});
console.log(`Created ${topic}`);
}
await admin.disconnect();
import { Kafka } from 'kafkajs';
// ---
const CLUSTER_URL = 'localhost:9092';
const TOPIC_COUNT = 3500;
// ---
const kafka = new Kafka({
clientId: 'mkocsar-kafkajs-test',
brokers: [ CLUSTER_URL ]
});
const admin = kafka.admin();
await admin.connect();
for (let index = 0; index < TOPIC_COUNT; ++index) {
const topic = `mkocsar.kafkajs.test-${index}`;
await admin.deleteTopics({
topics: [ topic ]
});
console.log(`Deleted ${topic}`);
}
await admin.disconnect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment