Skip to content

Instantly share code, notes, and snippets.

@hermesespinola
Created August 12, 2021 04:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hermesespinola/fc601e3ac5eb153bf541d2abb90f9caf to your computer and use it in GitHub Desktop.
Save hermesespinola/fc601e3ac5eb153bf541d2abb90f9caf to your computer and use it in GitHub Desktop.
Two simple kafka consumers with lag
const { Kafka } = require('kafkajs');
const topic = 'test-burrow';
const groupId = `test-burrow-group-699`;
const clientId = 'kafka-burrow-test';
const nConsumers = 2;
const consumerDelays = Array.from({ length: nConsumers }).map((_, i) => 500 + 500 * i);
console.log(`GroupID is ${groupId}`);
const consumers = consumerDelays
.map(() => new Kafka({ brokers: ['localhost:9092'], clientId }).consumer({ groupId, allowAutoTopicCreation: true }));
const sleep = (time) => new Promise((resolve) => setTimeout(resolve, time));
consumers.map(async (consumer, i) => {
await Promise.all([
consumer.connect().then(() => consumer.subscribe({
topic,
fromBeginning: true,
})),
sleep(500),
]);
consumer.run({
autoCommitInterval: 5000,
autoCommitThreshold: 10000,
eachMessage: async ({ partition, message }) => {
await sleep(consumerDelays[i]);
console.log(`${i} : ${partition} : ${message.offset}`);
}
});
});
consumers.map((consumer) => {
consumer.on(consumer.events.COMMIT_OFFSETS, (payload) => {
console.log('Commit offsets:', payload);
})
})
process.on('SIGINT', async () => {
console.log('Closing consumers.');
await Promise.allSettled(consumers.map((consumer) => consumer.disconnect()));
console.log('Closed consumers.');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment