Created
April 29, 2019 17:59
-
-
Save dineshbhagat/c01148ef544c4c97aa8a08668c3b326d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// in testing module | |
package com.example.demo; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.test.EmbeddedKafkaBroker; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.kafka.test.rule.EmbeddedKafkaRule; | |
import org.springframework.test.context.junit4.SpringRunner; | |
import static org.hamcrest.CoreMatchers.equalTo; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) | |
public class DemoApplicationTests { | |
@Autowired | |
private Consumer consumer; | |
@Autowired | |
private KafkaTemplate<String, String> kafkaTemplate; | |
private String myTestingTopic = "myTestingTopic"; | |
@Test | |
public void contextLoads() throws InterruptedException { | |
kafkaTemplate.send(myTestingTopic, "Hello"); | |
Thread.sleep(10000); | |
assertThat(consumer.getList().size(), equalTo(1)); | |
System.out.printf("Hello received"); | |
System.out.println(consumer.getList()); | |
} | |
public EmbeddedKafkaBroker getEmbeddedKafka() { | |
String[] topics = new String[1]; | |
topics[0] = myTestingTopic; | |
EmbeddedKafkaRule embeddedKafkaRule= new EmbeddedKafkaRule(1, false, topics); | |
return embeddedKafkaRule.getEmbeddedKafka(); | |
} | |
} | |
/* output: | |
received payload=ConsumerRecord(topic = myTestingTopic, partition = 0, offset = 0, CreateTime = 1556559649127, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Hello) | |
Hello received[ConsumerRecord(topic = myTestingTopic, partition = 0, offset = 0, CreateTime = 1556559649127, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Hello)] | |
*/ | |
// in main package | |
package com.example.demo; | |
import lombok.Data; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.stereotype.Component; | |
import java.util.List; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
@Component | |
@Data | |
public class Consumer { | |
List<String> list= new CopyOnWriteArrayList<>(); | |
@KafkaListener(topics = "${kafkatest.topic}") | |
public void receive(ConsumerRecord<?, ?> consumerRecord) { | |
System.out.println("received payload="+consumerRecord.toString()); | |
list.add(consumerRecord.toString()); | |
} | |
} | |
// pom.xml file | |
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.1.4.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>com.example</groupId> | |
<artifactId>demo</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>demo</name> | |
<description>Demo project for Spring Boot</description> | |
<properties> | |
<java.version>1.8</java.version> | |
<junit.jupiter.version>5.3.2</junit.jupiter.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-streams</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.projectlombok</groupId> | |
<artifactId>lombok</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.vintage</groupId> | |
<artifactId>junit-vintage-engine</artifactId> | |
<version>${junit.jupiter.version}</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> | |
// properties file | |
spring.kafka.consumer.auto-offset-reset=earliest | |
spring.kafka.consumer.group-id=myTestGroupId | |
kafkatest.topic=myTestingTopic | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment