Skip to content

Instantly share code, notes, and snippets.

@dwelch2344
Created October 2, 2018 20:05
Show Gist options
  • Save dwelch2344/fabec47e0da62e8bc9578b15d7e845c9 to your computer and use it in GitHub Desktop.
Save dwelch2344/fabec47e0da62e8bc9578b15d7e845c9 to your computer and use it in GitHub Desktop.
kafka avro nodejs example
const registryUrl = 'http://schema-registry:8081'
const avro = require('avsc')
const registry = require('avro-schema-registry')(registryUrl)
const kafka = require('kafka-node')
const client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' })
const topic = {
topic: 'actor7',
offset: 3695
}
const opts = {
// groupId: 'dummy-' + new Date().getTime(),
groupId: 'dummy',
fromOffset: true
}
// don't make this go up linter
;(async () => {
const results = []
// ensure we connect
await client.once('connect', msg => {
console.log('connect', { msg })
})
// wire up our stuff
const consumer = new kafka.Consumer(client, [topic], opts)
consumer.on('error', err => {
console.warn(err)
})
consumer.on('message', async __message => {
const key = await registry.decode(new Buffer(__message.key))
const value = await registry.decode(new Buffer(__message.value))
results.push({ __message, key, value })
})
console.log('starting Timer')
setTimeout(() => {
console.log(results)
}, 5 * 1000)
})()
const registryUrl = 'http://schema-registry:8081'
const avro = require('avsc')
const registry = require('avro-schema-registry')(registryUrl)
const kafka = require('kafka-node')
const request = require('request')
const client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' })
const topic = {
topic: 'actors7'
}
const opts = {
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 3
}
const customPartitioners = undefined
// const customPartitioners = [{...}]
// don't make this go up linter
;(async () => {
const schemas = {
key: await fetchSchema(registryUrl, `${topic.topic}-key`, 1),
value: await fetchSchema(registryUrl, `${topic.topic}-value`, 1)
}
// ensure we connect
await client.once('connect', msg => {
console.log('connect', { msg })
})
console.log('connected')
const producer = new kafka.Producer(client, opts, customPartitioners)
console.log('Producer created', producer)
producer.on('error', err => {
console.warn('Producer error', err)
})
// producer.on('ready', async () => {
console.log('producer ready')
const payloads = []
for (let id = 0; id < 10; id++) {
const k = {
id: id.toString(),
timestamp: `2017-05-26 14:${id}:57.62`
}
const v = {
id,
firstName: 'First' + id,
lastName: 'Last' + id,
lastUpdate: k.timestamp
}
const key = await registry.encodeKey(topic.topic, schemas.key, k)
const value = await registry.encodeMessage(topic.topic, schemas.value, v)
// const value = await registry.encodeMessage(topic.topic, schemas.value, v)
payloads.push({
topic: topic.topic,
messages: [new kafka.KeyedMessage(key, value)]
})
}
producer.send(payloads, function(err, data) {
if (err) {
console.warn('errored', err)
} else {
console.log(data)
}
})
})()
function fetchSchema(registryUrl, topic, version) {
return new Promise((resolve, reject) => {
request(
`${registryUrl}/subjects/${topic}/versions/${version}/schema`,
(err, res, body) => {
if (res.statusCode !== 200) {
const error = JSON.parse(data)
return reject(
new Error(
`Schema registry error: ${error.error_code} - ${error.message}`
)
)
}
resolve(JSON.parse(body))
}
)
})
}
{
"avro-schema-registry": "^1.1.0",
"avsc": "^5.4.3",
"kafka-node": "3.0.1",
"request": "2.88.0"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment