Skip to content

Instantly share code, notes, and snippets.

@GlenTiki
Created June 20, 2016 09:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GlenTiki/c30ce3e11ac763d3031320e133af7b79 to your computer and use it in GitHub Desktop.
Save GlenTiki/c30ce3e11ac763d3031320e133af7b79 to your computer and use it in GitHub Desktop.
var kafkaesque = require('kafkaesque');
var consumer = kafkaesque({
brokers: [
{host: 'localhost', port: 9094}
],
// clientId: 'fishy'
// group: 'multiconsumers',
// maxBytes: 2000000
});
var topic = 'my-replicated-partitioned-topic'
var partition = 0
// should all be valid
consumer.produce({topic: topic, partition: partition}, ['message form 1'], function() {})
consumer.produce({topic: topic}, 'message form 2', function() {})
consumer.produce(topic, 'message form 3', function() {})
// get a specific event emitter for each of these
consumer.poll({topic: topic, partition: partition}, poll);
consumer.poll({topic: topic, offset: 0}, poll);
consumer.poll({topic: topic, partition: partition, offset: 0}, poll);
consumer.poll({topic: topic}, poll);
consumer.poll(topic, poll);
// partition is an event emitter which is called PER partition related to the
// topic which was passed into poll
var x = 0;
function poll(err, partition) {
var consumer = x++;
if (err) console.log('error', err);
partition.on('message', function(offset, message, commit) {
// console.log(message)
console.log('received msg for consumer of partition: ' + consumer + '.', 'offset: ' + offset + '.', message.value);
commit();
});
// partition.on('debug', console.log.bind(null, 'debug ' + consumer));
partition.on('error', function(error) {
console.log('error', JSON.stringify(error));
});
}
// this connects to the group and uses group management semeantics for balancing
// partitions, etc.
consumer.subscribe('my-replicated-partitioned-topic');
consumer.connect(function(err, kafka) {
if (err) return console.log('ERROR CONNECTING:', err)
//subscribe only gets events from commited offset
// consumer.subscribe('my-replicated-partitioned-topic');
// consumer.subscribe('my-replicated-partitioned-topic-2');
consumer.unsubscribe('my-replicated-partitioned-topic-2');
kafka.on('message', function(message, commit) {
// console.log(message)
console.log('received msg for consumer of partition: ' + consumer + '.', 'offset: ' + offset + '.', message.value);
commit();
});
kafka.on('rebalance.start', function (info) {
});
kafka.on('rebalance.end', function (err, info) {
});
kafka.on('error', function (err) {
consumer.disconnect() // <--- DISCONNECT FROM ENTIRE CLUSTER
});
kafka.on('debug', function (info) {
});
// called every time kafkaesque connects to a new broker, with that brokers details
kafka.on('connect', function () {
});
// called every time kafkaesque disconnects from a broker, with that brokers details
kafka.on('disconnect', function () {
});
// called every time kafkaesque reconnects to a broker, with that brokers details
kafka.on('reconnect', function () {
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment