Last active
April 12, 2024 12:39
-
-
Save fs-doc/83a6b01550808bf601fec9c3ce7e3bfb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {KafkaConsumer} from "./kafkaConsumer"; | |
import jwt from 'jsonwebtoken'; | |
export class ServiceConsumer{ | |
private kafkaConsumer: KafkaConsumer; | |
constructor(){ | |
//Create the Kafka consumer instance and subscribe to the topics | |
this.kafkaConsumer = new KafkaConsumer([ | |
'topic1', | |
'topic2' | |
], 'serviceGroupRelatedId'); //Use something like 'paymentsServiceGroup' or 'com.yourdomain.paymentsServiceGroup' | |
//Bind the processMessage method to the class | |
this.processMessage = this.processMessage.bind(this); | |
//Start consuming messages | |
this.kafkaConsumer.consume(this.processMessage); | |
} | |
//Message processor callback. | |
async processMessage(message: {value: {data: any, user: {id: string, role: string}} | Buffer, key: string | Buffer}, topic: string){ | |
if(!message || !message.value){ | |
//This logic depends on how you want to handle malformed messages | |
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue | |
throw new Error('Missing payload'); | |
} | |
//The message consists of a key and a value. The value is the actual payload. Check the producer to see how the message is being created. | |
//The payload contains two properties: data and user. | |
//In this example if message is a Buffer, we're throwing an error. | |
//This logic depends on how you want to handle malformed messages | |
if (message.value instanceof Buffer) { | |
throw new Error('Invalid message format'); | |
} | |
//Remember, the data comes in string format | |
const {data, user} = JSON.parse(message.value); | |
if(!data || !user){ | |
//This logic depends on how you want to handle malformed messages | |
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue | |
throw new Error('Missing data in provided payload'); | |
} | |
//Perform user authentication here | |
//In this example, the user data provided is in JWT format, so you can decode it using a JWT library | |
//It is always useful to verify the identity of the user that has produced the message, to prevent unauthorized messages from being processed. | |
const userData = jwt.verify(user, process.env.JWT_KEY!); | |
if (!userData || !userData.id || !userData.role) { | |
//This logic depends on how you want to handle unauthorized messages | |
//You can log the error and return false, throw an error to kill the process or send a message to a dead-letter queue | |
throw new Error('Unauthorized user'); | |
} | |
let response: boolean = false; | |
switch (topic) { | |
case "topic1": | |
//Your service-related logic here to process topic 1 | |
break; | |
case "topic2": | |
//Your service-related logic here to process topic 2 | |
break; | |
} | |
//We're returning false if the message was not processed successfully. | |
return response; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment