Skip to content

Instantly share code, notes, and snippets.

@rajeshkumaravel
Last active November 13, 2019 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rajeshkumaravel/e36d4c461225d8dcd9df09e0656a18a9 to your computer and use it in GitHub Desktop.
Save rajeshkumaravel/e36d4c461225d8dcd9df09e0656a18a9 to your computer and use it in GitHub Desktop.
Kafka Node - Consumer pause
// consumer1.js
const kafka = require('kafka-node');
const CONFIG = require('../config/index');
const _TOPIC = CONFIG.KAFKA.TOPICS.TOPIC_1;
try {
const { Consumer } = kafka;
const client = new kafka.KafkaClient(CONFIG.KAFKA.SERVER);
const consumer = new Consumer(
client,
[{ topic: _TOPIC, partition: 0 }],
{
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'utf8',
keyEncoding: 'utf8',
fromOffset: false,
},
);
consumer.on('message', async (message) => {
console.log(`Message received at ${_TOPIC}. Message [ `, message.value, ' ]');
});
consumer.on('error', (err) => {
console.log(`${_TOPIC} topic error =>`, err);
});
} catch (e) {
console.log(e);
}
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// main.js
app.get('/api/v1/pause/:topicName', (req, res) => {
// Need to pause the consumer from here
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment