Skip to content

Instantly share code, notes, and snippets.

@Normal
Created June 8, 2017 15:57
Show Gist options
  • Save Normal/dde8f442e4a830950741d2bbfe58fc21 to your computer and use it in GitHub Desktop.
Save Normal/dde8f442e4a830950741d2bbfe58fc21 to your computer and use it in GitHub Desktop.
package popatsap.blog.kafkaexample;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
/**
* Run kafka server before running this class:
* docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 -d spotify/kafka
*/
@SpringBootApplication
public class Application implements CommandLineRunner {
private static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
/* Give some time to consumer to obtain messages */
Thread.sleep(1000);
context.close();
}
@Autowired
private KafkaTemplate<String, String> template;
@Override
public void run(String... args) throws Exception {
/* Kafka producer */
template.send("myTopic", "foo1");
template.send("myTopic", "foo2");
template.send("myTopic", "foo3");
}
/**
* Kafka consumer.
*/
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment