Skip to content

Instantly share code, notes, and snippets.

@fs-doc
Last active April 12, 2024 12:39
Show Gist options
  • Save fs-doc/83a6b01550808bf601fec9c3ce7e3bfb to your computer and use it in GitHub Desktop.
Save fs-doc/83a6b01550808bf601fec9c3ce7e3bfb to your computer and use it in GitHub Desktop.
import {KafkaConsumer} from "./kafkaConsumer";
import jwt from 'jsonwebtoken';
export class ServiceConsumer{
private kafkaConsumer: KafkaConsumer;
constructor(){
//Create the Kafka consumer instance and subscribe to the topics
this.kafkaConsumer = new KafkaConsumer([
'topic1',
'topic2'
], 'serviceGroupRelatedId'); //Use something like 'paymentsServiceGroup' or 'com.yourdomain.paymentsServiceGroup'
//Bind the processMessage method to the class
this.processMessage = this.processMessage.bind(this);
//Start consuming messages
this.kafkaConsumer.consume(this.processMessage);
}
//Message processor callback.
async processMessage(message: {value: {data: any, user: {id: string, role: string}} | Buffer, key: string | Buffer}, topic: string){
if(!message || !message.value){
//This logic depends on how you want to handle malformed messages
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue
throw new Error('Missing payload');
}
//The message consists of a key and a value. The value is the actual payload. Check the producer to see how the message is being created.
//The payload contains two properties: data and user.
//In this example if message is a Buffer, we're throwing an error.
//This logic depends on how you want to handle malformed messages
if (message.value instanceof Buffer) {
throw new Error('Invalid message format');
}
//Remember, the data comes in string format
const {data, user} = JSON.parse(message.value);
if(!data || !user){
//This logic depends on how you want to handle malformed messages
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue
throw new Error('Missing data in provided payload');
}
//Perform user authentication here
//In this example, the user data provided is in JWT format, so you can decode it using a JWT library
//It is always useful to verify the identity of the user that has produced the message, to prevent unauthorized messages from being processed.
const userData = jwt.verify(user, process.env.JWT_KEY!);
if (!userData || !userData.id || !userData.role) {
//This logic depends on how you want to handle unauthorized messages
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue
throw new Error('Unauthorized user');
}
let response: boolean = false;
switch (topic) {
case "topic1":
//Your service-related logic here to process topic 1
break;
case "topic2":
//Your service-related logic here to process topic 2
break;
}
//We're returning false if the message was not processed successfully.
return response;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment