Skip to content

Instantly share code, notes, and snippets.

@satendrakumar
Last active May 31, 2016 18:08
Show Gist options
  • Save satendrakumar/f931d27c63e2b05f4d815b15e9b229be to your computer and use it in GitHub Desktop.
Save satendrakumar/f931d27c63e2b05f4d815b15e9b229be to your computer and use it in GitHub Desktop.
Read data from kafka cluster.
//Worker code
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
public class Worker implements Callable<Boolean> {
ConsumerRecord record;
public Worker(ConsumerRecord record) {
this.record = record;
}
public Boolean call() {
Map<String, Object> data = new HashMap<>();
try {
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
Thread.sleep(10000);
System.out.println("Processing Thread---" + Thread.currentThread().getName() + " data: " + data);
return Boolean.TRUE;
} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
}
// consume data from kafka
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.concurrent.*;
public class AsyncConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
props.put("request.timeout.ms", 31000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Test1", "Test2"));
int poolSize=10;
ExecutorService es= Executors.newFixedThreadPool(poolSize);
CompletionService<Boolean> completionService=new ExecutorCompletionService<Boolean>(es);
try {
while (true) {
System.out.println("Polling................");
ConsumerRecords<String, String> records = consumer.poll(1000);
List<ConsumerRecord> recordList = new ArrayList();
for (ConsumerRecord<String, String> record : records) {
recordList.add(record);
if(recordList.size() ==poolSize){
int taskCount=poolSize;
//process it
recordList.forEach( recordTobeProcess -> completionService.submit(new Worker(recordTobeProcess)));
while(taskCount >0){
try {
Future<Boolean> futureResult = completionService.poll(1, TimeUnit.SECONDS);
if (futureResult != null) {
boolean result = futureResult.get().booleanValue();
taskCount = taskCount - 1;
}
}catch (Exception e) {
e.printStackTrace();
}
}
Map<TopicPartition,OffsetAndMetadata> commitOffset= Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(commitOffset);
}
}
}
} finally {
consumer.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment