Skip to content

Instantly share code, notes, and snippets.

@domderen
Created December 12, 2018 17:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save domderen/99b727c6cef6811e213b4969539a2bce to your computer and use it in GitHub Desktop.
Save domderen/99b727c6cef6811e213b4969539a2bce to your computer and use it in GitHub Desktop.
module.exports = {
topic: 'SomeTopic4'
}
{
"name": "kafka-test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"async": "^2.6.1",
"kafka-node": "^3.0.1",
"uuid": "^3.3.2"
}
}
var async = require('async');
var ConsumerGroup = require('kafka-node').ConsumerGroup;
const config = require('./config');
const ZOOKEEPER_HOST = process.env.ZOOKEEPER_HOST || '127.0.0.1:2181';
console.log('RUNNING3', ZOOKEEPER_HOST, config.topic)
var consumerOptions = {
host: ZOOKEEPER_HOST,
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};
var topics = [config.topic];
var consumerGroup3 = new ConsumerGroup(Object.assign({id: 'consumer3'}, consumerOptions), topics);
consumerGroup3.on('error', onError);
consumerGroup3.on('message', onMessage);
function onError (error) {
console.error(error);
console.error(error.stack);
}
function onMessage (message) {
console.log(message);
}
process.once('SIGINT', function () {
async.each([consumerGroup3], function (consumer, callback) {
consumer.close(true, callback);
});
});
var kafka = require('kafka-node');
const config = require('./config');
var Producer = kafka.Producer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
const KAFKA_HOST = process.env.KAFKA_HOST || '127.0.0.1:9092';
console.log('RUNNING3', KAFKA_HOST, config.topic);
const client = new kafka.KafkaClient({kafkaHost: KAFKA_HOST});
var producer = new Producer(client, { requireAcks: 1 });
producer.on('ready', function () {
var message = 'a message';
var keyedMessage = new KeyedMessage('keyed', 'a keyed message');
producer.send([
{ topic: config.topic, partition: 0, messages: [message, keyedMessage], attributes: 0 }
], function (err, result) {
console.log(err || result);
process.exit();
});
});
producer.on('error', function (err) {
console.log('error', err);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment