Skip to content

Instantly share code, notes, and snippets.

@AbhilashG97
Last active May 27, 2021 03:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AbhilashG97/8e48c3f2bcae0aaeb86bd64382988988 to your computer and use it in GitHub Desktop.
Save AbhilashG97/8e48c3f2bcae0aaeb86bd64382988988 to your computer and use it in GitHub Desktop.
Kafka JS producer code
const { Kafka } = require("kafkajs");
const { CompressionTypes, CompressionCodecs } = require("kafkajs");
const LZ4Codec = require("kafkajs-lz4");
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
var Twitter = require("node-tweet-stream"),
t = new Twitter({
consumer_key: "xxxx",
consumer_secret: "xxxx",
token: "xxxx",
token_secret: "xxxx",
});
const kafka = new Kafka({
clientId: "kafka-tweets",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
t.on("tweet", async function (tweet) {
// Kafka producer
await producer.connect();
// write tweet to producer
try {
await producer.send({
topic: "tweets_node",
messages: [{ key: "tweets", value: JSON.stringify(tweet) }],
acks: -1,
compression: CompressionTypes.LZ4,
});
} catch (error) {
console.log(error);
}
console.log("tweet received");
});
t.on("error", function (err) {
console.log("Oh no", err);
});
t.track("covid");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment