Last active
May 31, 2016 18:08
-
-
Save satendrakumar/f931d27c63e2b05f4d815b15e9b229be to your computer and use it in GitHub Desktop.
Read data from kafka cluster.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Worker code | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.Callable; | |
public class Worker implements Callable<Boolean> { | |
ConsumerRecord record; | |
public Worker(ConsumerRecord record) { | |
this.record = record; | |
} | |
public Boolean call() { | |
Map<String, Object> data = new HashMap<>(); | |
try { | |
data.put("partition", record.partition()); | |
data.put("offset", record.offset()); | |
data.put("value", record.value()); | |
Thread.sleep(10000); | |
System.out.println("Processing Thread---" + Thread.currentThread().getName() + " data: " + data); | |
return Boolean.TRUE; | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return Boolean.FALSE; | |
} | |
} | |
} | |
// consume data from kafka | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import java.util.*; | |
import java.util.concurrent.*; | |
public class AsyncConsumer { | |
public static void main(String[] args) { | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("group.id", "test-group"); | |
props.put("key.deserializer", StringDeserializer.class.getName()); | |
props.put("value.deserializer", StringDeserializer.class.getName()); | |
props.put("enable.auto.commit", false); | |
props.put("session.timeout.ms", 30000); | |
props.put("heartbeat.interval.ms", 10000); | |
props.put("request.timeout.ms", 31000); | |
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); | |
consumer.subscribe(Arrays.asList("Test1", "Test2")); | |
int poolSize=10; | |
ExecutorService es= Executors.newFixedThreadPool(poolSize); | |
CompletionService<Boolean> completionService=new ExecutorCompletionService<Boolean>(es); | |
try { | |
while (true) { | |
System.out.println("Polling................"); | |
ConsumerRecords<String, String> records = consumer.poll(1000); | |
List<ConsumerRecord> recordList = new ArrayList(); | |
for (ConsumerRecord<String, String> record : records) { | |
recordList.add(record); | |
if(recordList.size() ==poolSize){ | |
int taskCount=poolSize; | |
//process it | |
recordList.forEach( recordTobeProcess -> completionService.submit(new Worker(recordTobeProcess))); | |
while(taskCount >0){ | |
try { | |
Future<Boolean> futureResult = completionService.poll(1, TimeUnit.SECONDS); | |
if (futureResult != null) { | |
boolean result = futureResult.get().booleanValue(); | |
taskCount = taskCount - 1; | |
} | |
}catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
Map<TopicPartition,OffsetAndMetadata> commitOffset= Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), | |
new OffsetAndMetadata(record.offset() + 1)); | |
consumer.commitSync(commitOffset); | |
} | |
} | |
} | |
} finally { | |
consumer.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment