Created
August 17, 2019 23:45
-
-
Save ThisIsMissEm/bacfa4b3b116f9d90e778d19ac6ccaaf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kafka, Consumer, Producer, KafkaMessage } from "kafkajs"; | |
import { Logger } from "pino"; | |
import { EventEmitter } from "events"; | |
import uuidv4 from "uuid/v4"; | |
type Options = { | |
groupId: string; | |
topicPrefix: string; | |
logger: Logger; | |
}; | |
export class Client { | |
private events: EventEmitter; | |
private producer: Producer; | |
private consumer: Consumer; | |
private topicPrefix: string; | |
protected logger: Logger; | |
constructor(kafka: Kafka, options: Options) { | |
this.events = new EventEmitter(); | |
this.producer = kafka.producer(); | |
// Use random groupId so we consume ALL messages: | |
this.consumer = kafka.consumer({ groupId: uuidv4() }); | |
this.topicPrefix = options.topicPrefix; | |
this.logger = options.logger; | |
} | |
async start() { | |
await Promise.all([this.startProducer(), this.startConsumer()]); | |
} | |
async stop() { | |
await Promise.all([this.consumer.disconnect(), this.producer.disconnect()]); | |
} | |
async request(command, ...args) { | |
const id = uuidv4(); | |
// We setup the promise first, because kafka is bloody fast, so if your upstream | |
// service is faster that this service, then you may miss the message | |
const response = new Promise((resolve, reject) => { | |
this.events.once(id, result => { | |
if (result.error) { | |
reject(result); | |
} else { | |
resolve(result); | |
} | |
}); | |
}); | |
await this.producer.send({ | |
topic: `${this.topicPrefix}.requests`, | |
messages: [ | |
{ | |
key: Buffer.from(id, "ascii"), | |
value: Buffer.from( | |
JSON.stringify({ id, timestamp: Date.now(), command, args }), | |
"utf8" | |
) | |
} | |
] | |
}); | |
return response; | |
} | |
private async startProducer() { | |
await this.producer.connect(); | |
} | |
private async startConsumer() { | |
await this.consumer.connect(); | |
await this.consumer.subscribe({ | |
topic: `${this.topicPrefix}.responses` | |
}); | |
await this.consumer.run({ | |
eachBatchAutoResolve: false, | |
eachBatch: async ({ | |
batch, | |
resolveOffset, | |
heartbeat, | |
isRunning, | |
isStale | |
}) => { | |
for (let message of batch.messages) { | |
if (!isRunning() || isStale()) break; | |
const key = message.key.toString("utf8"); | |
this.logger.info( | |
{ | |
offset: message.offset, | |
key: message.key.toString("utf8") | |
}, | |
"Processing message at offset %d", | |
message.offset | |
); | |
// If we fail to parse the message, then we log an error and resolve that | |
// message offset, effectively skipping the message: | |
let value: { [key: string]: any }; | |
try { | |
value = JSON.parse(message.value.toString("utf8")); | |
} catch (err) { | |
this.logger.error( | |
{ | |
key: key, | |
value: message.value.toString("utf8"), | |
offset: message.offset | |
}, | |
`Error parsing JSON of message at offset: %d`, | |
message.offset | |
); | |
this.events.emit(key, { error: "Error parsing JSON of message" }); | |
await resolveOffset(message.offset); | |
await heartbeat(); | |
return; | |
} | |
this.events.emit(key, value); | |
await resolveOffset(message.offset); | |
await heartbeat(); | |
} | |
} | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dependencies": { | |
"kafkajs": "^1.10.0", | |
"pino": "^5.13.2", | |
"uuid": "^3.3.2" | |
}, | |
"devDependencies": { | |
"@types/node": "^12.7.2", | |
"@types/uuid": "^3.4.5" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment