Skip to content

Instantly share code, notes, and snippets.

@jparanda
Created February 5, 2021 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jparanda/a881d886cf286b482dde241f1485d7c1 to your computer and use it in GitHub Desktop.
Save jparanda/a881d886cf286b482dde241f1485d7c1 to your computer and use it in GitHub Desktop.
import com.globant.demos.core.config.KafkaConfig;
import com.globant.demos.core.consumer.AemKafkaConsumerJob;
import lombok.extern.slf4j.Slf4j;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component
public class AemKafkaConsumerImpl {
@Reference
KafkaConfig kafkaConfig;
private Runnable mainJob;
@Activate
public void initConsumer() {
log.info("Creating and running new AemKafkaConsumerJob...");
String kafkaGroupId = "aemConsumers-" + this.getHostname();
CountDownLatch latch = new CountDownLatch(1);
//Create the consumer runnable
mainJob = new AemKafkaConsumerJob(
kafkaConfig,
kafkaGroupId,
latch);
//start the thread
Thread mainThread = new Thread(mainJob);
mainThread.start();
//add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread( () -> {
log.info("Caught shutdown hook");
((AemKafkaConsumerJob)mainJob).shutDown();
}));
try {
latch.await();
} catch (InterruptedException ex) {
log.error("Application got interrupted");
} finally {
log.info("Application is closing");
}
}
private String getHostname() {
String hostname = "Unknown";
try {
InetAddress addr;
addr = InetAddress.getLocalHost();
hostname = addr.getHostName();
} catch (UnknownHostException ex) {
log.error("Hostname can not be resolved, use default one");
}
return hostname;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment