Last active
December 13, 2020 09:39
-
-
Save ru-rocker/8ffac498fdb751362fa2e996a0f6cf83 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AuctionProcessor implements Processor<String, AuctionDto> { | |
private KeyValueStore<String, AuctionDto> kvStore; | |
private ProcessorContext context; | |
@SuppressWarnings("unchecked") | |
@Override | |
public void init(ProcessorContext context) { | |
this.context = context; | |
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store"); | |
} | |
@Override | |
public void process(String key, AuctionDto current) { | |
final AuctionDto prev = kvStore.get(key); | |
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) { | |
kvStore.put(key, current); | |
context.forward(key, current); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AuctionProcessorTest { | |
private AuctionProcessor processor; | |
private MockProcessorContext context; | |
private KeyValueStore<String, AuctionDto> store; | |
@BeforeEach | |
public void init() { | |
processor = new AuctionProcessor(); | |
final Properties config = new Properties(); | |
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test"); | |
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); | |
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdesFactory.auctionSerde().getClass()); | |
context = new MockProcessorContext(config); | |
store = Stores | |
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("action-key-store"), | |
Serdes.String(), MySerdesFactory.auctionSerde()) | |
.withLoggingDisabled().build(); | |
store.init(context, store); | |
context.register(store, null); | |
processor.init(context); | |
} | |
@Test | |
public void testInitialBid_then_ReturnTheBid() { | |
AuctionDto auctionDto = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(20.0) | |
.build(); | |
processor.process("item-1", auctionDto); | |
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator(); | |
// confirmed that processor forwards the first bid | |
MockProcessorContext.CapturedForward forward = forwarded.next(); | |
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", auctionDto)); | |
assertThat(forwarded.hasNext()).isFalse(); | |
// keyStore in processor now contains item-1 with Alice as the bidder with 20.0 | |
assertThat(store.get("item-1")).isEqualTo(auctionDto); | |
} | |
@Test | |
public void testLowerBid_then_ReturnPrevBid() { | |
AuctionDto existingAuction = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(20.0) | |
.build(); | |
// simulate existing auction | |
store.put("item-1", existingAuction); | |
AuctionDto newAuction = AuctionDto.builder() | |
.customerName("Bob") | |
.bidPrice(10.0) | |
.build(); | |
processor.process("item-1", newAuction); | |
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator(); | |
// confirmed that processor does not forward anything | |
assertThat(forwarded.hasNext()).isFalse(); | |
// keyStore in processor contains item-1 with existing | |
assertThat(store.get("item-1")).isEqualTo(existingAuction); | |
} | |
@Test | |
public void testSameBid_then_ReturnPrevBid() { | |
AuctionDto existingAuction = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(20.0) | |
.build(); | |
// simulate existing auction | |
store.put("item-1", existingAuction); | |
AuctionDto newAuction = AuctionDto.builder() | |
.customerName("Bob") | |
.bidPrice(20.0) | |
.build(); | |
processor.process("item-1", newAuction); | |
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator(); | |
// confirmed that processor does not forward anything | |
assertThat(forwarded.hasNext()).isFalse(); | |
// keyStore in processor contains item-1 with existing | |
assertThat(store.get("item-1")).isEqualTo(existingAuction); | |
} | |
@Test | |
public void testHigherBid_then_ReturnNewBid() { | |
AuctionDto existingAuction = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(20.0) | |
.build(); | |
// simulate existing auction | |
store.put("item-1", existingAuction); | |
AuctionDto newAuction = AuctionDto.builder() | |
.customerName("Bob") | |
.bidPrice(25.0) | |
.build(); | |
processor.process("item-1", newAuction); | |
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator(); | |
// confirmed that processor forwards the newest bid | |
MockProcessorContext.CapturedForward forward = forwarded.next(); | |
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", newAuction)); | |
assertThat(forwarded.hasNext()).isFalse(); | |
// keyStore in processor contains item-1 with existing | |
assertThat(store.get("item-1")).isEqualTo(newAuction); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void createCustomProcessor(final Topology topology) { | |
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionKeyStore = Stores.keyValueStoreBuilder( | |
Stores.persistentKeyValueStore("action-key-store"), Serdes.String(), | |
MySerdesFactory.auctionSerde()); | |
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionWallClockKeyStore = Stores.keyValueStoreBuilder( | |
Stores.persistentKeyValueStore("action-key-store-schedule-wall-clock"), Serdes.String(), | |
MySerdesFactory.auctionSerde()); | |
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionStreamTimeKeyStore = Stores.keyValueStoreBuilder( | |
Stores.persistentKeyValueStore("action-key-store-schedule-stream-time"), Serdes.String(), | |
MySerdesFactory.auctionSerde()); | |
topology.addSource("source", Serdes.String().deserializer(), | |
MySerdesFactory.auctionSerde().deserializer(), "auction-bid-input"); | |
// topology for standard auction processor | |
topology.addProcessor("proc", AuctionProcessor::new, "source"); | |
topology.addStateStore(auctionKeyStore, "proc"); | |
topology.addSink("sink", "auction-bid-output", "proc"); | |
// topology for stream time punctuator auction processor | |
topology.addProcessor("proc-stream-time", AuctionWithScheduleStreamTimeProcessor::new, "source"); | |
topology.addStateStore(auctionStreamTimeKeyStore, "proc-stream-time"); | |
topology.addSink("sink-stream-time", "auction-bid-stream-time-output", "proc-stream-time"); | |
// topology for wall clock punctuator auction processor | |
topology.addProcessor("proc-wall-clock", AuctionWithScheduleWallClockProcessor::new, "source"); | |
topology.addStateStore(auctionWallClockKeyStore, "proc-wall-clock"); | |
topology.addSink("sink-wall-clock", "auction-bid-wall-clock-output", "proc-wall-clock"); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// AuctionTopologyTest.java | |
@Test | |
@DisplayName("Test multiple auction with stream time") | |
public void testAuctionWithStreamTime() { | |
AuctionDto alice = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(10.0) | |
.build(); | |
AuctionDto bob = AuctionDto.builder() | |
.customerName("Bob") | |
.bidPrice(9.5) | |
.build(); | |
AuctionDto charlie = AuctionDto.builder() | |
.customerName("Charlie") | |
.bidPrice(10.5) | |
.build(); | |
AuctionDto dave = AuctionDto.builder() | |
.customerName("Dave") | |
.bidPrice(1.5) | |
.build(); | |
Instant now = Instant.now(); | |
inputTopic.pipeInput("item-1", alice, now); | |
inputTopic.pipeInput("item-1", bob, now.plus(1, ChronoUnit.SECONDS)); | |
inputTopic.pipeInput("item-1", charlie, now.plus(2, ChronoUnit.SECONDS)); | |
inputTopic.pipeInput("item-2", dave, now.plus(3, ChronoUnit.SECONDS)); | |
Map<String, AuctionDto> expected = Map.of( | |
"item-1", charlie, | |
"item-2", dave | |
); | |
// make sure the output is not empty | |
assertThat(outputStreamTimeTopic.isEmpty()).isFalse(); | |
Map<String, AuctionDto> result = new HashMap<>(); | |
while(!outputStreamTimeTopic.isEmpty()){ | |
final KeyValue<String, AuctionDto> kv = outputStreamTimeTopic.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// AuctionTopologyTest.java | |
@Test | |
@DisplayName("Test multiple auction with wall clock") | |
public void testAuctionWithWallClock() { | |
AuctionDto alice = AuctionDto.builder() | |
.customerName("Alice") | |
.bidPrice(10.0) | |
.build(); | |
AuctionDto bob = AuctionDto.builder() | |
.customerName("Bob") | |
.bidPrice(9.5) | |
.build(); | |
AuctionDto charlie = AuctionDto.builder() | |
.customerName("Charlie") | |
.bidPrice(10.5) | |
.build(); | |
AuctionDto dave = AuctionDto.builder() | |
.customerName("Dave") | |
.bidPrice(1.5) | |
.build(); | |
inputTopic.pipeInput("item-1", alice); | |
inputTopic.pipeInput("item-1", bob); | |
inputTopic.pipeInput("item-1", charlie); | |
inputTopic.pipeInput("item-2", dave); | |
Map<String, AuctionDto> expected = Map.of( | |
"item-1", charlie, | |
"item-2", dave | |
); | |
// output topic should be empty because the wall clock time still not advancing | |
assertThat(outputWallClockTopic.isEmpty()).isTrue(); | |
// advancing wall clock time | |
testDriver.advanceWallClockTime(Duration.ofMillis(1000)); | |
// make sure the output is not empty | |
assertThat(outputWallClockTopic.isEmpty()).isFalse(); | |
Map<String, AuctionDto> result = new HashMap<>(); | |
while(!outputWallClockTopic.isEmpty()){ | |
final KeyValue<String, AuctionDto> kv = outputWallClockTopic.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AuctionWithScheduleStreamTimeProcessor implements Processor<String, AuctionDto> { | |
private KeyValueStore<String, AuctionDto> kvStore; | |
private ProcessorContext context; | |
@SuppressWarnings("unchecked") | |
@Override | |
public void init(ProcessorContext context) { | |
this.context = context; | |
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-stream-time"); | |
this.context.schedule(Duration.ofMillis(1000), PunctuationType.STREAM_TIME, (timestamp) -> { | |
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all(); | |
while (iter.hasNext()) { | |
final KeyValue<String, AuctionDto> entry = iter.next(); | |
context.forward(entry.key, entry.value); | |
} | |
iter.close(); | |
// commit the current processing progress | |
context.commit(); | |
}); | |
} | |
@Override | |
public void process(String key, AuctionDto current) { | |
final AuctionDto prev = kvStore.get(key); | |
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) { | |
kvStore.put(key, current); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AuctionWithScheduleWallClockProcessor implements Processor<String, AuctionDto> { | |
private KeyValueStore<String, AuctionDto> kvStore; | |
private ProcessorContext context; | |
@SuppressWarnings("unchecked") | |
@Override | |
public void init(ProcessorContext context) { | |
this.context = context; | |
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-wall-clock"); | |
this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> { | |
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all(); | |
while (iter.hasNext()) { | |
final KeyValue<String, AuctionDto> entry = iter.next(); | |
context.forward(entry.key, entry.value); | |
} | |
iter.close(); | |
// commit the current processing progress | |
context.commit(); | |
}); | |
} | |
@Override | |
public void process(String key, AuctionDto current) { | |
final AuctionDto prev = kvStore.get(key); | |
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) { | |
kvStore.put(key, current); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// EmployeeTopologyTest.java | |
@Test | |
@DisplayName("Test Employee Topology between department and employee, exclude employment history") | |
public void testEmployeeAggregationTopologyWithoutEmploymentHistory() { | |
// Finance Department | |
DepartmentDto financeDept = DepartmentDto.builder() | |
.deptId(1) | |
.deptName("Finance") | |
.build(); | |
// Employee: Alice | |
EmployeeDto alice = EmployeeDto.builder() | |
.empId(1000) | |
.empName("Alice") | |
.deptId(1) | |
.build(); | |
// expected output | |
EmployeeResultDto expected = new EmployeeResultDto(); | |
expected.setDeptId(1); | |
expected.setDeptName("Finance"); | |
expected.setEmpId(1000); | |
expected.setEmpName("Alice"); | |
// 1.1 insert finance dept to DEPT topic. | |
// Remember: I put key as null value because we do key repartitioning in deptTable. | |
// But this depends on your use case. | |
deptInput.pipeInput(null, financeDept); | |
// 1.2 output topic (EMP-RESULT) is empty because inner join behaviour between employee and dept | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
// 2.1 insert employee to EMPLOYEE topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employeeInput.pipeInput(null, alice); | |
// 2.2 output topic (EMP-RESULT) now is not empty because there are two stream data with associated key (dept_id) | |
assertThat(employeeOutput.isEmpty()).isFalse(); | |
assertThat(employeeOutput.readKeyValue()).isEqualTo(new KeyValue<>(1000, expected)); | |
// 2.3 make sure no record left in the output topic | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
} | |
@Test | |
@DisplayName("Test Employee Topology between department and employee, include employment history") | |
public void testEmployeeAggregationTopologyWithEmploymentHistory() { | |
// Finance Department | |
DepartmentDto financeDept = DepartmentDto.builder() | |
.deptId(1) | |
.deptName("Finance") | |
.build(); | |
// Employee: Alice | |
EmployeeDto alice = EmployeeDto.builder() | |
.empId(1000) | |
.empName("Alice") | |
.deptId(1) | |
.build(); | |
// History: Company A | |
EmploymentHistoryDto historyCompanyA = EmploymentHistoryDto.builder() | |
.empHistId(1) | |
.empId(1000) | |
.employerName("Company A") | |
.build(); | |
// History: Company B | |
EmploymentHistoryDto historyCompanyB = EmploymentHistoryDto.builder() | |
.empHistId(1) | |
.empId(1000) | |
.employerName("Company B") | |
.build(); | |
// expected output | |
EmployeeResultDto expected = new EmployeeResultDto(); | |
expected.setDeptId(1); | |
expected.setDeptName("Finance"); | |
expected.setEmpId(1000); | |
expected.setEmpName("Alice"); | |
expected.setEmploymentHistory(Set.of("Company A", "Company B")); | |
// 1. insert finance dept to DEPT topic. | |
// Remember: I put key as null value because we do key repartitioning in deptTable. | |
// But this depends on your use case. | |
deptInput.pipeInput(null, financeDept); | |
// 2. insert employee to EMPLOYEE topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employeeInput.pipeInput(null, alice); | |
// 3. insert employee to EMPLOYMENT-HISTORY topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employmentHistoryInput.pipeInput(null, historyCompanyA); | |
employmentHistoryInput.pipeInput(null, historyCompanyB); | |
// make sure topic is not empty | |
assertThat(employeeOutput.isEmpty()).isFalse(); | |
// loop until last records, because we cannot predict the left-join behaviour. | |
// what we now is only the last record should be as what we expected. | |
KeyValue<Integer, EmployeeResultDto> kv = null; | |
while(!employeeOutput.isEmpty()) { | |
kv = employeeOutput.readKeyValue(); | |
} | |
// make sure kv is not null | |
assertThat(kv).isNotNull(); | |
assertThat(kv).isEqualTo(new KeyValue<>(1000, expected)); | |
// make sure no record left in the output topic | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-streams-test-utils</artifactId> | |
<version>2.5.0</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.assertj</groupId> | |
<artifactId>assertj-core</artifactId> | |
<version>3.18.1</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-params</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-engine</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.platform</groupId> | |
<artifactId>junit-platform-launcher</artifactId> | |
<version>1.6.2</version> | |
<scope>test</scope> | |
</dependency> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//WordCountTimeWindowsTopology.java -- see at line 10 | |
public void createTopology(StreamsBuilder builder) { | |
final KStream<String, String> textLines = builder | |
.stream("streams-plaintext-input", | |
Consumed.with(Serdes.String(), Serdes.String())); | |
textLines | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
.groupBy((key, value) -> value) | |
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(5))) | |
.count(Materialized.as("WordCount")) | |
.toStream() | |
.map((key, value) -> new KeyValue<>(key.key(),value)) | |
.to("streams-wordcount-output", | |
Produced.with(Serdes.String(), Serdes.Long())); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// WordCountTimeWindowsTopologyTest.java - see at line 29 | |
@Test | |
@DisplayName("Test word count streams") | |
public void testWordCountStream() { | |
String text1 = "Welcome to kafka streams"; | |
String text2 = "Kafka streams is great"; | |
String text3 = "Welcome back"; | |
// expected output | |
Map<String,Long> expected = Map.of( | |
// please take note, now the welcome is only 1. | |
// Because the second welcome word, come after 5 minutes duration. | |
"welcome", 1L, | |
"to", 1L, | |
"kafka", 2L, | |
"streams", 2L, | |
"is", 1L, | |
"great", 1L, | |
"back", 1L | |
); | |
final Instant now = Instant.now(); | |
// insert two lines with the same timestamp | |
plainTextInput.pipeInput(null,text1, now); | |
plainTextInput.pipeInput(null,text2, now); | |
// simulate 5 minutes after the first two | |
plainTextInput.pipeInput(null,text3, now.plus(5, ChronoUnit.MINUTES)); | |
assertThat(wordCountOutput.isEmpty()).isFalse(); | |
// result | |
Map<String, Long> result = new HashMap<>(); | |
while(!wordCountOutput.isEmpty()) { | |
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
assertThat(wordCountOutput.isEmpty()).isTrue(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void createTopology(StreamsBuilder builder) { | |
final KStream<String, String> textLines = builder | |
.stream("streams-plaintext-input", | |
Consumed.with(Serdes.String(), Serdes.String())); | |
textLines | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
.groupBy((key, value) -> value) | |
.count(Materialized.as("WordCount")) | |
.toStream() | |
.to("streams-wordcount-output", | |
Produced.with(Serdes.String(), Serdes.Long())); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// WordCountTopologyTest.java | |
private TopologyTestDriver testDriver; | |
private TestInputTopic<String, String> plainTextInput; | |
private TestOutputTopic<String, Long> wordCountOutput; | |
private final Serde<String> stringSerde = new Serdes.StringSerde(); | |
private final Serde<Long> longSerde = new Serdes.LongSerde(); | |
@BeforeEach | |
public void init() { | |
final Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); | |
WordCountTopology wordCountTopology = new WordCountTopology(); | |
final StreamsBuilder builder = new StreamsBuilder(); | |
wordCountTopology.createTopology(builder); | |
final Topology topology = builder.build(); | |
testDriver = new TopologyTestDriver(topology, props); | |
plainTextInput = testDriver.createInputTopic("streams-plaintext-input", stringSerde.serializer(), | |
stringSerde.serializer()); | |
wordCountOutput = testDriver.createOutputTopic("streams-wordcount-output", stringSerde.deserializer(), | |
longSerde.deserializer()); | |
} | |
@AfterEach | |
public void tearDown() throws IOException { | |
testDriver.close(); | |
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/app-id")); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// WordCountTopologyTest.java | |
@Test | |
@DisplayName("Test word count streams") | |
public void testWordCountStream() { | |
String text1 = "Welcome to kafka streams"; | |
String text2 = "Kafka streams is great"; | |
String text3 = "Welcome back"; | |
// expected output | |
Map<String,Long> expected = Map.of( | |
"welcome", 2L, | |
"to", 1L, | |
"kafka", 2L, | |
"streams", 2L, | |
"is", 1L, | |
"great", 1L, | |
"back", 1L | |
); | |
plainTextInput.pipeInput(null,text1); | |
plainTextInput.pipeInput(null,text2); | |
plainTextInput.pipeInput(null,text3); | |
assertThat(wordCountOutput.isEmpty()).isFalse(); | |
// result | |
Map<String, Long> result = new HashMap<>(); | |
while(!wordCountOutput.isEmpty()) { | |
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
assertThat(wordCountOutput.isEmpty()).isTrue(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment