Created
February 5, 2021 14:33
-
-
Save jparanda/7f30ede7b91334df2adf6312f19f76ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.globant.demos.core.config.KafkaConfig; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.common.errors.WakeupException; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import java.time.Duration; | |
import java.util.Arrays; | |
import java.util.Properties; | |
import java.util.concurrent.CountDownLatch; | |
@Slf4j | |
public class AemKafkaConsumerJob implements Runnable { | |
private static final String DEFAULT_AUTO_COMMIT_INTERVAL = "2000"; | |
private static final String DEFAULT_REQUEST_TIMEOUT = "1500"; | |
private KafkaConsumer<String, String> consumer; | |
private KafkaConfig kafkaConfig; | |
private boolean enabled = true; | |
private long pollTimeOut; | |
private CountDownLatch latch; | |
public AemKafkaConsumerJob(KafkaConfig kafkaConfig, | |
String kafkaGroupId, | |
CountDownLatch latch) { | |
this.kafkaConfig = kafkaConfig; | |
this.pollTimeOut = kafkaConfig.getPollTimeout(); | |
this.latch = latch; | |
log.info("Creating kafka consumer..."); | |
Properties properties = new Properties(); | |
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
kafkaConfig.getClusterServers()); | |
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, | |
DEFAULT_REQUEST_TIMEOUT); | |
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, | |
kafkaGroupId); | |
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class.getName()); | |
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class.getName()); | |
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | |
kafkaConfig.getAutoOffsetReset()); | |
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, | |
DEFAULT_AUTO_COMMIT_INTERVAL); | |
//create the consumer | |
consumer = new KafkaConsumer<String, String>(properties); | |
//subscribe the consumer to our topic(s) | |
consumer.subscribe(Arrays.asList(kafkaConfig.getTopics())); | |
} | |
@Override | |
public void run() { | |
try { | |
while (isEnabled()) { | |
if(!kafkaConfig.isEnabled()) { | |
//poll message and discard without handling | |
log.info("Skipping kafka message consume. Kafka consumer is not enabled by config..."); | |
consumer.poll(Duration.ofMillis(pollTimeOut)); | |
} else { | |
try { | |
ConsumerRecords<String, String> records = | |
consumer.poll(Duration.ofMillis(pollTimeOut)); | |
for (ConsumerRecord<String, String> record : records) { | |
if(record.serializedValueSize() == 0) { | |
log.info("Ignore zero-size record = {}", record); | |
continue; | |
} | |
String jsonMessage = record.value(); | |
log.info("The message get from kafka is {}", jsonMessage); | |
} | |
} catch (WakeupException wke) { | |
log.error("Received shutdown signal!!"); | |
} finally { | |
consumer.close(); | |
//tell our main code we're done with the consumer | |
latch.countDown(); | |
} | |
} | |
} | |
} catch (Exception ex) { | |
log.error(ex.getMessage(), ex); | |
} | |
} | |
public void shutDown() { | |
consumer.wakeup(); | |
} | |
public boolean isEnabled() { | |
return enabled; | |
} | |
public void setEnabled(boolean enabled) { | |
this.enabled = enabled; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment