Skip to content

Instantly share code, notes, and snippets.

@tulios
Created October 8, 2018 18:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tulios/cfba2d2c3b8112bcfb98dcf12789cfde to your computer and use it in GitHub Desktop.
Save tulios/cfba2d2c3b8112bcfb98dcf12789cfde to your computer and use it in GitHub Desktop.
Unit test for KafkaJS LZ4 compression
const createProducer = require('../../producer')
const createConsumer = require('../index')
const { Types, Codecs } = require('../../protocol/message/compression')
const LZ4 = require('kafkajs-lz4')
Codecs[Types.LZ4] = new LZ4().codec
const {
secureRandom,
createCluster,
createTopic,
newLogger,
createModPartitioner,
waitForMessages,
testIfKafka011,
waitForConsumerToJoinGroup,
} = require('testHelpers')
describe('Consumer', () => {
let topicName, groupId, cluster, producer, consumer
beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`
await createTopic({ topic: topicName })
})
afterEach(async () => {
await consumer.disconnect()
await producer.disconnect()
})
testIfKafka011('consume LZ4 messages with 0.11 format', async () => {
cluster = createCluster({ allowExperimentalV011: true })
producer = createProducer({
cluster,
createPartitioner: createModPartitioner,
logger: newLogger(),
})
consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 100,
logger: newLogger(),
})
await consumer.connect()
await producer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
const messagesConsumed = []
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)
const key1 = secureRandom()
const message1 = {
key: `key-${key1}`,
value: `value-${key1}`,
headers: { [`header-${key1}`]: `header-value-${key1}` },
}
const key2 = secureRandom()
const message2 = {
key: `key-${key2}`,
value: `value-${key2}`,
headers: { [`header-${key2}`]: `header-value-${key2}` },
}
await producer.send({
acks: 1,
topic: topicName,
compression: Types.LZ4,
messages: [message1, message2],
})
await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([
{
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(message1.key),
value: Buffer.from(message1.value),
headers: {
[`header-${key1}`]: Buffer.from(message1.headers[`header-${key1}`]),
},
magicByte: 2,
offset: '0',
}),
},
{
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(message2.key),
value: Buffer.from(message2.value),
headers: {
[`header-${key2}`]: Buffer.from(message2.headers[`header-${key2}`]),
},
magicByte: 2,
offset: '1',
}),
},
])
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment