Skip to content

Instantly share code, notes, and snippets.

@jparanda
Created February 5, 2021 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jparanda/7f30ede7b91334df2adf6312f19f76ec to your computer and use it in GitHub Desktop.
Save jparanda/7f30ede7b91334df2adf6312f19f76ec to your computer and use it in GitHub Desktop.
import com.globant.demos.core.config.KafkaConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class AemKafkaConsumerJob implements Runnable {
private static final String DEFAULT_AUTO_COMMIT_INTERVAL = "2000";
private static final String DEFAULT_REQUEST_TIMEOUT = "1500";
private KafkaConsumer<String, String> consumer;
private KafkaConfig kafkaConfig;
private boolean enabled = true;
private long pollTimeOut;
private CountDownLatch latch;
public AemKafkaConsumerJob(KafkaConfig kafkaConfig,
String kafkaGroupId,
CountDownLatch latch) {
this.kafkaConfig = kafkaConfig;
this.pollTimeOut = kafkaConfig.getPollTimeout();
this.latch = latch;
log.info("Creating kafka consumer...");
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getClusterServers());
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
DEFAULT_REQUEST_TIMEOUT);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
kafkaGroupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
kafkaConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
DEFAULT_AUTO_COMMIT_INTERVAL);
//create the consumer
consumer = new KafkaConsumer<String, String>(properties);
//subscribe the consumer to our topic(s)
consumer.subscribe(Arrays.asList(kafkaConfig.getTopics()));
}
@Override
public void run() {
try {
while (isEnabled()) {
if(!kafkaConfig.isEnabled()) {
//poll message and discard without handling
log.info("Skipping kafka message consume. Kafka consumer is not enabled by config...");
consumer.poll(Duration.ofMillis(pollTimeOut));
} else {
try {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(pollTimeOut));
for (ConsumerRecord<String, String> record : records) {
if(record.serializedValueSize() == 0) {
log.info("Ignore zero-size record = {}", record);
continue;
}
String jsonMessage = record.value();
log.info("The message get from kafka is {}", jsonMessage);
}
} catch (WakeupException wke) {
log.error("Received shutdown signal!!");
} finally {
consumer.close();
//tell our main code we're done with the consumer
latch.countDown();
}
}
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
public void shutDown() {
consumer.wakeup();
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment