Skip to content

Instantly share code, notes, and snippets.

@jkomyno
Created February 23, 2022 02:12
Show Gist options
  • Save jkomyno/484e7252c1272910bf45661f2e2d542f to your computer and use it in GitHub Desktop.
Save jkomyno/484e7252c1272910bf45661f2e2d542f to your computer and use it in GitHub Desktop.
fastify-kafka.ts
import { FastifyPluginCallback } from 'fastify';
import fp from 'fastify-plugin';
import kafka, { ProducerGlobalConfig, Producer } from 'node-rdkafka';
export class KafkaProducer {
constructor(private producer: Producer) {}
send = <T extends string>({ topic, key, payload }: { topic: string; key: string; payload: T }) =>{
this.producer.produce(
topic,
null,
Buffer.from(payload),
key,
Date.now(),
);
}
}
declare module 'fastify' {
interface FastifyRequest {
kafka: KafkaProducer;
}
}
const fastifyKafkaPlugin: FastifyPluginCallback<ProducerGlobalConfig> = (fastify, options, done) => {
const producer = new kafka.Producer(options);
fastify.addHook('onReady', async () => {
// hook triggered before the server starts listening for requests
await new Promise<void>((resolve, reject) => {
producer.connect();
producer.once('ready', () => {
fastify.log.info('kafka:ready');
resolve();
});
producer.once('event.error', (kafkaError) => {
fastify.log.error({ kafkaError }, 'kafka:connection-error');
reject(kafkaError);
});
});
});
fastify.addHook('onClose', async () => {
await new Promise<void>((resolve, reject) => {
producer.disconnect(err => {
if (err) {
fastify.log.error({ err }, 'kafka:disconnect-error');
reject(err);
} else {
fastify.log.error('kafka:disconnect');
resolve();
}
});
});
});
const kafkaProducer = new KafkaProducer(producer);
fastify.decorateRequest('kafka', null);
fastify.addHook('onRequest', async (request, reply) => {
request.kafka = kafkaProducer;
});
done();
}
export const fastifyKafka = fp(fastifyKafkaPlugin);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment