Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package com.softwaremill.react.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
public class KafkaPauses {
public static void main(String[] args) {
String topic = UUID.randomUUID().toString();
String groupId = UUID.randomUUID().toString();
int ELEMENT_COUNT = 500000;
// writing
Properties prodProperties = producerProperties();
KafkaProducer<Object, Object> producer = new KafkaProducer<>(prodProperties);
System.out.println("Filling Kafka with " + ELEMENT_COUNT + " elements");
for (int i = 0; i < ELEMENT_COUNT; i++)
producer.send(new ProducerRecord<>(topic, "msg"));
producer.close();
System.out.println("Topic " + topic + " filled");
// reading
Properties consumerProperties = consumerProperties(groupId);
KafkaConsumer consumer = new KafkaConsumer(consumerProperties);
consumer.subscribe(Arrays.asList(topic));
int readCount = 0;
while (readCount < ELEMENT_COUNT) {
ConsumerRecords records = consumer.poll(1000);
System.out.println("Poll returned " + records.count() + " elements");
records.iterator();
readCount += records.count();
}
consumer.close();
}
private static Properties producerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
private static Properties consumerProperties(String groupId) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", groupId);
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "false");
// !!!!!!!!!!!!!!!!!!!!!
properties.put("max.partition.fetch.bytes", "350000"); // <<<<<< comment this for trouble
return properties;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment