Skip to content

Instantly share code, notes, and snippets.

View mchmielarz's full-sized avatar
🏠
Working from home

Michał Chmielarz mchmielarz

🏠
Working from home
View GitHub Profile
// create required number of vertices
IntStream.range(0, numberOfPartitions)
.forEach(partition -> {
vertx.deployVerticle(
() -> KafkaPartitionedVerticle.create(topic, partition, kafkaConfig),
deploymentOptions,
async -> log.info("Partitioned Kafka consumer deployed. DeploymentId: {}", async.result())
);
});
class KafkaVerticle extends AbstractVerticle {
// initialization
@Override
public void start() {
KafkaConsumer.create(vertx, kafkaConfig)
.subscribe(topic)
.handler(record ->
// processing logic comes here
Consumer
.plainPartitionedSource(consumerSettings, Subscriptions.topics(topicName))
.mapAsync(maxPartitions, pair -> {
Source<ConsumerRecord<String, String>, NotUsed> source = pair.second();
return source
.map(record -> {
// processing logic comes here
return record;
})
.runWith(Sink.ignore(), materializer);
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicName))
.map(committableMessage -> {
// processing logic comes here
return committableMessage;
})
.mapAsync(maxParallelism, msg -> CompletableFuture.completedFuture(msg.committableOffset()))
.runWith(Committer.sink(CommitterSettings.create(committerSettings)), materializer)
.toCompletableFuture()
.handle(AppSupport.doneHandler())
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topicName))
.map(consumerRecord -> {
// processing logic comes here
return consumerRecord;
})
.runWith(Sink.ignore(), materializer)
.toCompletableFuture()
.handle(AppSupport.doneHandler())
.join();
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", batch = true)
public class BatchedMicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(List<ConsumerRecord<String, String>> records) {
log.info("Batch received: {}", records.size());
records.forEach(rec ->
// processing logic comes here
);
}
}
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", threads = 5)
public class MultithreadedMicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(@KafkaKey String key, String value, int partition) {
switch (partition) {
case 0:
// processing logic comes here
break;
case 1:
// processing logic comes here
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}")
public class MicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(@KafkaKey String key, String value) {
// processing logic comes here
}
}
@Component
public class PartitionedKafkaListenerConsumer {
@KafkaListener(
clientIdPrefix = "part0",
topics = "${spring.kafka.consumer.topic}",
groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {
@TopicPartition(topic = "${spring.kafka.consumer.topic}", partitions = {"0"})
})
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> multiThreadedListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}