Skip to content

Instantly share code, notes, and snippets.

@ankon
Created November 6, 2017 11:55
Show Gist options
  • Save ankon/9c2263bf1d04600129aeaa06e8c49179 to your computer and use it in GitHub Desktop.
Save ankon/9c2263bf1d04600129aeaa06e8c49179 to your computer and use it in GitHub Desktop.
Consume all events in a kafka topic using rdkafka
function consumeAllEvents(kafkaBrokers, groupId, globalOptions, topic, handleEvent) {
const options = Object.assign({
'enable.auto.commit': false,
'group.id': groupId,
'metadata.broker.list': kafkaBrokers,
'socket.keepalive.enable': true,
}, globalOptions);
const topicOptions = {
'auto.offset.reset': 'earliest'
};
const streamOptions = {
objectMode: true,
streamAsBatch: true,
topics: topic,
waitInterval: 0,
};
return new Promise((resolve, reject) => {
let resolved = false;
const stream = kafka.KafkaConsumer.createReadStream(options, topicOptions, streamOptions);
stream.on('error', err => {
logger.error(`Error reading from kafka: ${err.message}`);
if (!resolved) {
resolved = true;
return reject(err);
}
});
stream.on('close', () => {
logger.info('Reached end of stream');
if (!resolved) {
resolved = true;
return resolve();
}
});
stream.consumer.on('event.error', err => {
logger.error(err);
// XXX: Different type of error?
if (!resolved) {
resolved = true;
return reject(err);
}
});
let closing = false;
const collectData = new Writable({
objectMode: true,
write(chunks, encoding, callback) {
for (const chunk of chunks) {
handleEvent(chunk);
}
if (closing) {
// The reader is already closed, so we're just consuming the pending events now.
return callback();
}
// Check all positions now: if all of them have reached the end, then we can stop reading events.
const positions = stream.consumer.position();
const reachedEndPromises = positions.map(position => new Promise((resolve, reject) => {
return stream.consumer.queryWatermarkOffsets(topic, position.partition, function queryWOCb(err, watermarks) {
console.log(`queryWatermarkOffsets: ${topic}/${position.partition}: ${watermarks.lowOffset} < ${position.offset} < ${watermarks.highOffset}`);
// We're at the end right now, and can stop. Note that there
// may still be events in-flight, so the main promise is not yet "resolved".
return resolve(position.offset === watermarks.highOffset);
});
}));
return Promise.all(reachedEndPromises).then(reachedEnds => reachedEnds.indexOf(false) === -1).then(allReachedEnd => {
if (allReachedEnd) {
console.log('All partitions reached, stopping');
closing = true;
stream.destroy();
}
return callback();
});
}
});
stream.pipe(collectData);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment