Last active
August 18, 2023 11:02
-
-
Save mkocsar/b58191139784d9244b62b33b13dd4ab8 to your computer and use it in GitHub Desktop.
KafkaJS send duration measurement with thousands of topics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { hrtime } from 'node:process'; | |
import { Kafka } from 'kafkajs'; | |
// --- | |
const CLUSTER_URL = 'localhost:9092'; | |
const TOPIC_COUNT = 3500; | |
const TPS = 25; | |
const REPORT_WINDOW_S = 1; | |
const TEST_DURATION_S = 60; | |
// --- | |
const kafka = new Kafka({ | |
clientId: 'mkocsar-kafkajs-test', | |
brokers: [ CLUSTER_URL ] | |
}); | |
const producer = kafka.producer({ | |
allowAutoTopicCreation: false | |
}); | |
await producer.connect(); | |
let nextTopicIndex = 0; | |
const sendEvent = async () => { | |
const topic = `mkocsar.kafkajs.test-${nextTopicIndex}`; | |
nextTopicIndex = (nextTopicIndex + 1) % TOPIC_COUNT; | |
await producer.send({ | |
topic, | |
messages: [{ | |
value: 'dummy-value' | |
}] | |
}); | |
}; | |
let reportWindowEventCount = 0; | |
const measuredDurationCount = TPS * REPORT_WINDOW_S; | |
let measuredDurations = []; | |
let measuredDurationsSum = 0; | |
const sendEventMeasured = async () => { | |
const start = hrtime.bigint(); | |
await sendEvent(); | |
const measuredDuration = Number(hrtime.bigint() - start) / 1_000_000; | |
++reportWindowEventCount; | |
measuredDurations.push(measuredDuration); | |
measuredDurationsSum += measuredDuration; | |
if (measuredDurations.length > measuredDurationCount) { | |
measuredDurationsSum -= measuredDurations.shift(); | |
} | |
} | |
const report = () => { | |
const measuredDurationAvg = (measuredDurationsSum / measuredDurations.length).toFixed(3); | |
console.log(`Window: ${REPORT_WINDOW_S} s ; Events: ${reportWindowEventCount} ; Average 'send' duration: ${measuredDurationAvg} ms`) | |
reportWindowEventCount = 0; | |
}; | |
// Warm-up | |
for (let index = 0; index < TOPIC_COUNT; ++index) { | |
await sendEvent(); | |
} | |
const sendEventDelayMs = 1000 / TPS; | |
const sendEventIntervalId = setInterval(sendEventMeasured, sendEventDelayMs); | |
const reportIntervalId = setInterval(report, REPORT_WINDOW_S * 1000); | |
const shutdown = async () => { | |
clearInterval(sendEventIntervalId); | |
clearInterval(reportIntervalId); | |
await producer.disconnect(); | |
}; | |
setTimeout(shutdown, TEST_DURATION_S * 1000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kafka } from 'kafkajs'; | |
// --- | |
const CLUSTER_URL = 'localhost:9092'; | |
const TOPIC_COUNT = 3500; | |
// --- | |
const kafka = new Kafka({ | |
clientId: 'mkocsar-kafkajs-test', | |
brokers: [ CLUSTER_URL ] | |
}); | |
const admin = kafka.admin(); | |
await admin.connect(); | |
for (let index = 0; index < TOPIC_COUNT; ++index) { | |
const topic = `mkocsar.kafkajs.test-${index}`; | |
await admin.createTopics({ | |
topics: [{ | |
topic, | |
numPartitions: 2, | |
replicationFactor: 1 | |
}] | |
}); | |
console.log(`Created ${topic}`); | |
} | |
await admin.disconnect(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kafka } from 'kafkajs'; | |
// --- | |
const CLUSTER_URL = 'localhost:9092'; | |
const TOPIC_COUNT = 3500; | |
// --- | |
const kafka = new Kafka({ | |
clientId: 'mkocsar-kafkajs-test', | |
brokers: [ CLUSTER_URL ] | |
}); | |
const admin = kafka.admin(); | |
await admin.connect(); | |
for (let index = 0; index < TOPIC_COUNT; ++index) { | |
const topic = `mkocsar.kafkajs.test-${index}`; | |
await admin.deleteTopics({ | |
topics: [ topic ] | |
}); | |
console.log(`Deleted ${topic}`); | |
} | |
await admin.disconnect(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment