Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import { Kafka, Consumer, Producer, KafkaMessage } from "kafkajs";
import { Logger } from "pino";
import { EventEmitter } from "events";
import uuidv4 from "uuid/v4";
type Options = {
groupId: string;
topicPrefix: string;
logger: Logger;
};
export class Client {
private events: EventEmitter;
private producer: Producer;
private consumer: Consumer;
private topicPrefix: string;
protected logger: Logger;
constructor(kafka: Kafka, options: Options) {
this.events = new EventEmitter();
this.producer = kafka.producer();
// Use random groupId so we consume ALL messages:
this.consumer = kafka.consumer({ groupId: uuidv4() });
this.topicPrefix = options.topicPrefix;
this.logger = options.logger;
}
async start() {
await Promise.all([this.startProducer(), this.startConsumer()]);
}
async stop() {
await Promise.all([this.consumer.disconnect(), this.producer.disconnect()]);
}
async request(command, ...args) {
const id = uuidv4();
// We setup the promise first, because kafka is bloody fast, so if your upstream
// service is faster that this service, then you may miss the message
const response = new Promise((resolve, reject) => {
this.events.once(id, result => {
if (result.error) {
reject(result);
} else {
resolve(result);
}
});
});
await this.producer.send({
topic: `${this.topicPrefix}.requests`,
messages: [
{
key: Buffer.from(id, "ascii"),
value: Buffer.from(
JSON.stringify({ id, timestamp: Date.now(), command, args }),
"utf8"
)
}
]
});
return response;
}
private async startProducer() {
await this.producer.connect();
}
private async startConsumer() {
await this.consumer.connect();
await this.consumer.subscribe({
topic: `${this.topicPrefix}.responses`
});
await this.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
isRunning,
isStale
}) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break;
const key = message.key.toString("utf8");
this.logger.info(
{
offset: message.offset,
key: message.key.toString("utf8")
},
"Processing message at offset %d",
message.offset
);
// If we fail to parse the message, then we log an error and resolve that
// message offset, effectively skipping the message:
let value: { [key: string]: any };
try {
value = JSON.parse(message.value.toString("utf8"));
} catch (err) {
this.logger.error(
{
key: key,
value: message.value.toString("utf8"),
offset: message.offset
},
`Error parsing JSON of message at offset: %d`,
message.offset
);
this.events.emit(key, { error: "Error parsing JSON of message" });
await resolveOffset(message.offset);
await heartbeat();
return;
}
this.events.emit(key, value);
await resolveOffset(message.offset);
await heartbeat();
}
}
});
}
}
{
"dependencies": {
"kafkajs": "^1.10.0",
"pino": "^5.13.2",
"uuid": "^3.3.2"
},
"devDependencies": {
"@types/node": "^12.7.2",
"@types/uuid": "^3.4.5"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment