View partitioned_kafka_vertx.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// create required number of vertices | |
IntStream.range(0, numberOfPartitions) | |
.forEach(partition -> { | |
vertx.deployVerticle( | |
() -> KafkaPartitionedVerticle.create(topic, partition, kafkaConfig), | |
deploymentOptions, | |
async -> log.info("Partitioned Kafka consumer deployed. DeploymentId: {}", async.result()) | |
); | |
}); |
View simple_kafka_vertx.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class KafkaVerticle extends AbstractVerticle { | |
// initialization | |
@Override | |
public void start() { | |
KafkaConsumer.create(vertx, kafkaConfig) | |
.subscribe(topic) | |
.handler(record -> | |
// processing logic comes here |
View partitioned_kafka_akka.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Consumer | |
.plainPartitionedSource(consumerSettings, Subscriptions.topics(topicName)) | |
.mapAsync(maxPartitions, pair -> { | |
Source<ConsumerRecord<String, String>, NotUsed> source = pair.second(); | |
return source | |
.map(record -> { | |
// processing logic comes here | |
return record; | |
}) | |
.runWith(Sink.ignore(), materializer); |
View committable_kafka_akka.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Consumer | |
.committableSource(consumerSettings, Subscriptions.topics(topicName)) | |
.map(committableMessage -> { | |
// processing logic comes here | |
return committableMessage; | |
}) | |
.mapAsync(maxParallelism, msg -> CompletableFuture.completedFuture(msg.committableOffset())) | |
.runWith(Committer.sink(CommitterSettings.create(committerSettings)), materializer) | |
.toCompletableFuture() | |
.handle(AppSupport.doneHandler()) |
View plain_kafka_akka.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Consumer | |
.plainSource(consumerSettings, Subscriptions.topics(topicName)) | |
.map(consumerRecord -> { | |
// processing logic comes here | |
return consumerRecord; | |
}) | |
.runWith(Sink.ignore(), materializer) | |
.toCompletableFuture() | |
.handle(AppSupport.doneHandler()) | |
.join(); |
View batched_micronaut_kafka_listener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", batch = true) | |
public class BatchedMicronautListener { | |
@Topic("${kafka.consumers.micronaut-group.topic}") | |
void receive(List<ConsumerRecord<String, String>> records) { | |
log.info("Batch received: {}", records.size()); | |
records.forEach(rec -> | |
// processing logic comes here | |
); | |
} | |
} |
View concurrent_microunat_kafka_listener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", threads = 5) | |
public class MultithreadedMicronautListener { | |
@Topic("${kafka.consumers.micronaut-group.topic}") | |
void receive(@KafkaKey String key, String value, int partition) { | |
switch (partition) { | |
case 0: | |
// processing logic comes here | |
break; | |
case 1: | |
// processing logic comes here |
View simple_micronaut_kafka_listener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}") | |
public class MicronautListener { | |
@Topic("${kafka.consumers.micronaut-group.topic}") | |
void receive(@KafkaKey String key, String value) { | |
// processing logic comes here | |
} | |
} |
View partitioned_spring_kafka_listener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class PartitionedKafkaListenerConsumer { | |
@KafkaListener( | |
clientIdPrefix = "part0", | |
topics = "${spring.kafka.consumer.topic}", | |
groupId = "${spring.kafka.consumer.group-id}", | |
topicPartitions = { | |
@TopicPartition(topic = "${spring.kafka.consumer.topic}", partitions = {"0"}) | |
}) |
View concurrent_spring_container_factory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> multiThreadedListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setConcurrency(3); | |
return factory; | |
} |
NewerOlder