Skip to content

Instantly share code, notes, and snippets.

@argentinaluiz
Last active July 17, 2023 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save argentinaluiz/9c15edc2e5ce1b291a6972805746d8d9 to your computer and use it in GitHub Desktop.
Save argentinaluiz/9c15edc2e5ce1b291a6972805746d8d9 to your computer and use it in GitHub Desktop.
Nestjs with Kafka - Subscribe with wildcard
@MessagePattern('/orders.*/', { flags: 'i' })
import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';
export class KafkaCustomTransport
extends ServerKafka
implements CustomTransportStrategy
{
override async bindEvents(consumer: Consumer): Promise<void> {
const registeredPatterns = [...this.messageHandlers.entries()].map(
([pattern, handler]) =>
pattern.startsWith('/') && pattern.endsWith('/')
? new RegExp(
pattern.slice(1, pattern.length - 2),
handler.extras.flags,
)
: pattern,
);
const consumerSubscribeOptions = this.options.subscribe || {};
const subscribeToPattern = async (pattern: string) =>
consumer.subscribe({
topic: pattern,
...consumerSubscribeOptions,
});
await Promise.all(registeredPatterns.map(subscribeToPattern));
const consumerRunOptions = Object.assign(this.options.run || {}, {
eachMessage: this.getMessageHandler(),
});
await consumer.run(consumerRunOptions);
}
public override getHandlerByPattern(pattern: string) {
const handler = super.getHandlerByPattern(pattern);
if (handler) {
return handler;
}
return this.getHandlerByRegExp(pattern);
}
private getHandlerByRegExp(pattern: string) {
const route = this.getRouteFromPattern(pattern);
const keys = this.messageHandlers.keys();
for (const key of keys) {
const regexp = new RegExp(key.slice(1, key.length - 1));
if (regexp.test(route)) return this.messageHandlers.get(key);
}
return null;
}
}
app.connectMicroservice<MicroserviceOptions>({
strategy: new KafkaCustomTransport({
client: {
brokers: ['kafka:29092'],
},
consumer: {
groupId: 'payments-consumer',
},
}),
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment