Skip to content

Instantly share code, notes, and snippets.

@joeljames
Last active April 20, 2020 02:57
Show Gist options
  • Save joeljames/c26426a08197e798442e1dbf2ab725b5 to your computer and use it in GitHub Desktop.
Save joeljames/c26426a08197e798442e1dbf2ab725b5 to your computer and use it in GitHub Desktop.
@Slf4j
public class KafkaConsumerWithThreads {
private static final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private static final List<String> topics = List.of("my-topic");
private static final int noOfWorkerThreads = 3;
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(noOfWorkerThreads);
IntStream.range(0, noOfWorkerThreads)
.forEach(i -> service.execute(getRunnableTask()));
getRuntime().addShutdownHook(new Thread(() -> {
shutdownRequested.set(true);
ConcurrentUtils.stop(service);
}));
}
static Runnable getRunnableTask() {
return () -> {
while (true) {
try {
if (shutdownRequested.get()) {
log.info("Shutdown requested for {}. Exiting...");
return;
}
startConsumer();
} catch (Exception e) {
log.error("Error occurred: ", e);
}
}
};
}
static void startConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-group");
try (KafkaConsumer consumer = new KafkaConsumer(props)) {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
log.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
}
}
} catch (Exception e) {
log.error("Consumer error", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment