Skip to content

Instantly share code, notes, and snippets.

@tulios
Created October 3, 2018 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tulios/1605365a6c136c9726172a364bdcfc64 to your computer and use it in GitHub Desktop.
Save tulios/1605365a6c136c9726172a364bdcfc64 to your computer and use it in GitHub Desktop.
kafkajs_cluster_consumer_per_worker
const fs = require('fs')
const ip = require('ip')
const cluster = require('cluster')
const { Kafka, logLevel } = require('../index')
const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const host = process.env.HOST_IP || ip.address()
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
clientId: 'example-consumer',
ssl: {
servername: 'localhost',
cert: fs.readFileSync('./testHelpers/certs/client_cert.pem', 'utf-8'),
key: fs.readFileSync('./testHelpers/certs/client_key.pem', 'utf-8'),
ca: [fs.readFileSync('./testHelpers/certs/ca_cert.pem', 'utf-8')],
},
sasl: {
mechanism: 'plain',
username: 'test',
password: 'testtest',
},
})
const topic = 'topic-test'
const run = async () => {
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic })
consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
// },
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
return consumer
}
const numWorkers = 6
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`)
for (var i = 0; i < numWorkers; i++) {
cluster.fork()
}
cluster.on('exit', worker => {
console.log(`Worker ${worker.process.pid} died`)
})
errorTypes.map(type => {
process.on(type, e => {
try {
console.log(`process.on ${type}`)
console.error(e)
cluster.disconnect(() => {
console.log('disconnected on error')
process.exit(0)
})
} catch (_) {
process.exit(1)
}
})
})
signalTraps.map(type => {
process.once(type, () => {
return new Promise((resolve, reject) => {
try {
cluster.disconnect(() => {
console.log('disconnected')
resolve()
})
} catch (e) {
// process.kill(process.pid, type)
reject(new Error(type))
}
})
})
})
} else {
console.log(`Worker ${process.pid} is running`)
const consumer = run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
errorTypes.map(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})
signalTraps.map(type => {
process.once(type, async () => {
try {
await consumer.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment