Skip to content

Instantly share code, notes, and snippets.

Avatar
🏠
Working from home

Michał Chmielarz mchmielarz

🏠
Working from home
View GitHub Profile
View partitioned_kafka_vertx.java
// create required number of vertices
IntStream.range(0, numberOfPartitions)
.forEach(partition -> {
vertx.deployVerticle(
() -> KafkaPartitionedVerticle.create(topic, partition, kafkaConfig),
deploymentOptions,
async -> log.info("Partitioned Kafka consumer deployed. DeploymentId: {}", async.result())
);
});
View simple_kafka_vertx.java
class KafkaVerticle extends AbstractVerticle {
// initialization
@Override
public void start() {
KafkaConsumer.create(vertx, kafkaConfig)
.subscribe(topic)
.handler(record ->
// processing logic comes here
View partitioned_kafka_akka.java
Consumer
.plainPartitionedSource(consumerSettings, Subscriptions.topics(topicName))
.mapAsync(maxPartitions, pair -> {
Source<ConsumerRecord<String, String>, NotUsed> source = pair.second();
return source
.map(record -> {
// processing logic comes here
return record;
})
.runWith(Sink.ignore(), materializer);
View committable_kafka_akka.java
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicName))
.map(committableMessage -> {
// processing logic comes here
return committableMessage;
})
.mapAsync(maxParallelism, msg -> CompletableFuture.completedFuture(msg.committableOffset()))
.runWith(Committer.sink(CommitterSettings.create(committerSettings)), materializer)
.toCompletableFuture()
.handle(AppSupport.doneHandler())
View plain_kafka_akka.java
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topicName))
.map(consumerRecord -> {
// processing logic comes here
return consumerRecord;
})
.runWith(Sink.ignore(), materializer)
.toCompletableFuture()
.handle(AppSupport.doneHandler())
.join();
View batched_micronaut_kafka_listener.java
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", batch = true)
public class BatchedMicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(List<ConsumerRecord<String, String>> records) {
log.info("Batch received: {}", records.size());
records.forEach(rec ->
// processing logic comes here
);
}
}
View concurrent_microunat_kafka_listener.java
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}", threads = 5)
public class MultithreadedMicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(@KafkaKey String key, String value, int partition) {
switch (partition) {
case 0:
// processing logic comes here
break;
case 1:
// processing logic comes here
View simple_micronaut_kafka_listener.java
@KafkaListener(groupId = "micronaut-group", clientId = "${kafka.consumers.micronaut-group.client-id}")
public class MicronautListener {
@Topic("${kafka.consumers.micronaut-group.topic}")
void receive(@KafkaKey String key, String value) {
// processing logic comes here
}
}
View partitioned_spring_kafka_listener.java
@Component
public class PartitionedKafkaListenerConsumer {
@KafkaListener(
clientIdPrefix = "part0",
topics = "${spring.kafka.consumer.topic}",
groupId = "${spring.kafka.consumer.group-id}",
topicPartitions = {
@TopicPartition(topic = "${spring.kafka.consumer.topic}", partitions = {"0"})
})
View concurrent_spring_container_factory.java
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> multiThreadedListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}