Skip to content

Instantly share code, notes, and snippets.

@llimacruz
Created January 9, 2020 14:34
Show Gist options
  • Save llimacruz/6e8aac2f8fa36723a82c885d4e2e1b6d to your computer and use it in GitHub Desktop.
Save llimacruz/6e8aac2f8fa36723a82c885d4e2e1b6d to your computer and use it in GitHub Desktop.
Streams - branch
const kafka = require('kafka-node');
const ConsumerGroup = kafka.ConsumerGroup;
const topic = process.argv[2];
console.log('topic name:', topic);
const consumer = new ConsumerGroup(
{ kafkaHost: 'localhost:9092', groupId: 'node-consumer'},
topic
)
let count = 1;
consumer.on('message', message => {
console.log(count++, 'Recebi a mensagem: ', message.value);
})
var { random } = require('lodash');
const kafka = require('kafka-node');
const ObjectID = require("bson-objectid");
const HighLevelProducer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({ kafkaHost: '127.0.0.1:9092' });
const producer = new HighLevelProducer(client);
producer.on('ready', () => {
console.log('Iniciado!');
processManyMessages();
})
let deliveryEngines = ['xewards', 'xdh', 'coupon'];
let experiences = ['itau', 'bb', 'bradesco', 'marisa'];
const processManyMessages = () => {
for (let i = 0; i < process.argv[2]; i++) {
const messages = [];
const paymentId = ObjectID();
messages.push(JSON.stringify({
id: paymentId,
experience: experiences[random(0, 3)],
deliveryEngine: deliveryEngines[random(0, 2)],
}));
const payloads = [{ topic: 'payments', messages }];
producer.send(payloads, (err, data) => {
if (err) {
console.log('err', err)
return;
}
console.log(data);
});
}
}
const { KafkaStreams } = require("kafka-streams");
const config = {
"noptions": {
"metadata.broker.list": "localhost:9092",
"group.id": "kafka-streams-test-native1",
"client.id": "kafka-streams-test-name-native",
"event_cb": true,
"compression.codec": "snappy",
"api.version.request": true,
"socket.keepalive.enable": true,
"socket.blocking.max.ms": 100,
"enable.auto.commit": false,
"auto.commit.interval.ms": 100,
"heartbeat.interval.ms": 250,
"retry.backoff.ms": 250,
"fetch.min.bytes": 100,
"fetch.message.max.bytes": 2 * 1024 * 1024,
"queued.min.messages": 100,
"fetch.error.backoff.ms": 100,
"queued.max.messages.kbytes": 50,
"fetch.wait.max.ms": 1000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 10000
},
"tconf": {
"auto.offset.reset": "earliest",
"request.required.acks": 1
},
"batchOptions": {
"batchSize": 10,
"commitEveryNBatch": 1,
"concurrency": 1,
"commitSync": false,
"noBatchCommits": false
}
}
const kafkaStreams = new KafkaStreams(config);
kafkaStreams.on("error", (error) => console.error('Deu ruim:', error));
const filterBranchCoupon = message => {
const msg = JSON.parse(message.value);
return msg.deliveryEngine === 'coupon';
}
const filterBranchXDH = message => {
const msg = JSON.parse(message.value);
return msg.deliveryEngine === 'xdh';
}
const filterBranchX1 = message => {
const msg = JSON.parse(message.value);
if (msg.deliveryEngine === 'xewards') {
if (msg.experience === 'itau') {
return true;
}
const char = msg.id.charCodeAt(msg.id.length - 1);
return (char % 2 === 1);
}
return false;
}
const filterBranchX2 = message => {
const msg = JSON.parse(message.value);
if (msg.deliveryEngine === 'xewards') {
if (msg.experience === 'itau') {
return false;
}
const char = msg.id.charCodeAt(msg.id.length - 1);
return (char % 2 === 0);
}
return false;
}
const kafkaTopicName = "payments";
const consumerEntrada = kafkaStreams.getKStream(kafkaTopicName);
const [
branchCoupon,
branchXDH,
branchXewards1,
branchXewards2
] =
consumerEntrada.branch([
filterBranchCoupon,
filterBranchXDH,
filterBranchX1,
filterBranchX2
]);
const producerPromiseCoupon = branchCoupon
.mapJSONConvenience()
.mapWrapKafkaValue()
.tap((msg) => console.log("Coupon", JSON.stringify(msg)))
.to("coupon", 1, "buffer");
const producerPromiseXDH = branchXDH
.mapJSONConvenience()
.mapWrapKafkaValue()
.tap((msg) => console.log("XDH ", JSON.stringify(msg)))
.to("xdh", 1, "buffer");
const producerPromiseXewards1 = branchXewards1
.mapJSONConvenience()
.mapWrapKafkaValue()
.tap((msg) => console.log("X1 ", JSON.stringify(msg)))
.to("xewards1", 1, "buffer");
const producerPromiseXewards2 = branchXewards2
.mapJSONConvenience()
.mapWrapKafkaValue()
.tap((msg) => console.log("X2 ", JSON.stringify(msg)))
.to("xewards2", 1, "buffer");
Promise.all([
producerPromiseCoupon,
producerPromiseXDH,
producerPromiseXewards1,
producerPromiseXewards2,
consumerEntrada.start(),
]).then(() => {
console.log("Stream started, as kafka consumer and producers are ready.");
}, (error) => {
console.log("Streaming operation failed to start: ", error);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment