Created
June 20, 2016 09:09
-
-
Save GlenTiki/c30ce3e11ac763d3031320e133af7b79 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var kafkaesque = require('kafkaesque'); | |
var consumer = kafkaesque({ | |
brokers: [ | |
{host: 'localhost', port: 9094} | |
], | |
// clientId: 'fishy' | |
// group: 'multiconsumers', | |
// maxBytes: 2000000 | |
}); | |
var topic = 'my-replicated-partitioned-topic' | |
var partition = 0 | |
// should all be valid | |
consumer.produce({topic: topic, partition: partition}, ['message form 1'], function() {}) | |
consumer.produce({topic: topic}, 'message form 2', function() {}) | |
consumer.produce(topic, 'message form 3', function() {}) | |
// get a specific event emitter for each of these | |
consumer.poll({topic: topic, partition: partition}, poll); | |
consumer.poll({topic: topic, offset: 0}, poll); | |
consumer.poll({topic: topic, partition: partition, offset: 0}, poll); | |
consumer.poll({topic: topic}, poll); | |
consumer.poll(topic, poll); | |
// partition is an event emitter which is called PER partition related to the | |
// topic which was passed into poll | |
var x = 0; | |
function poll(err, partition) { | |
var consumer = x++; | |
if (err) console.log('error', err); | |
partition.on('message', function(offset, message, commit) { | |
// console.log(message) | |
console.log('received msg for consumer of partition: ' + consumer + '.', 'offset: ' + offset + '.', message.value); | |
commit(); | |
}); | |
// partition.on('debug', console.log.bind(null, 'debug ' + consumer)); | |
partition.on('error', function(error) { | |
console.log('error', JSON.stringify(error)); | |
}); | |
} | |
// this connects to the group and uses group management semeantics for balancing | |
// partitions, etc. | |
consumer.subscribe('my-replicated-partitioned-topic'); | |
consumer.connect(function(err, kafka) { | |
if (err) return console.log('ERROR CONNECTING:', err) | |
//subscribe only gets events from commited offset | |
// consumer.subscribe('my-replicated-partitioned-topic'); | |
// consumer.subscribe('my-replicated-partitioned-topic-2'); | |
consumer.unsubscribe('my-replicated-partitioned-topic-2'); | |
kafka.on('message', function(message, commit) { | |
// console.log(message) | |
console.log('received msg for consumer of partition: ' + consumer + '.', 'offset: ' + offset + '.', message.value); | |
commit(); | |
}); | |
kafka.on('rebalance.start', function (info) { | |
}); | |
kafka.on('rebalance.end', function (err, info) { | |
}); | |
kafka.on('error', function (err) { | |
consumer.disconnect() // <--- DISCONNECT FROM ENTIRE CLUSTER | |
}); | |
kafka.on('debug', function (info) { | |
}); | |
// called every time kafkaesque connects to a new broker, with that brokers details | |
kafka.on('connect', function () { | |
}); | |
// called every time kafkaesque disconnects from a broker, with that brokers details | |
kafka.on('disconnect', function () { | |
}); | |
// called every time kafkaesque reconnects to a broker, with that brokers details | |
kafka.on('reconnect', function () { | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment