Skip to content

Instantly share code, notes, and snippets.

Last active April 20, 2020 02:57
Show Gist options
  • Save joeljames/c26426a08197e798442e1dbf2ab725b5 to your computer and use it in GitHub Desktop.
Save joeljames/c26426a08197e798442e1dbf2ab725b5 to your computer and use it in GitHub Desktop.
public class KafkaConsumerWithThreads {
private static final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private static final List<String> topics = List.of("my-topic");
private static final int noOfWorkerThreads = 3;
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(noOfWorkerThreads);
IntStream.range(0, noOfWorkerThreads)
.forEach(i -> service.execute(getRunnableTask()));
getRuntime().addShutdownHook(new Thread(() -> {
static Runnable getRunnableTask() {
return () -> {
while (true) {
try {
if (shutdownRequested.get()) {"Shutdown requested for {}. Exiting...");
} catch (Exception e) {
log.error("Error occurred: ", e);
static void startConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("", "test-group");
try (KafkaConsumer consumer = new KafkaConsumer(props)) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {"Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
} catch (Exception e) {
log.error("Consumer error", e);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment