Skip to content

Instantly share code, notes, and snippets.

@ru-rocker
Last active December 13, 2020 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ru-rocker/8ffac498fdb751362fa2e996a0f6cf83 to your computer and use it in GitHub Desktop.
Save ru-rocker/8ffac498fdb751362fa2e996a0f6cf83 to your computer and use it in GitHub Desktop.
public class AuctionProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store");
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
context.forward(key, current);
}
}
@Override
public void close() {
}
}
public class AuctionProcessorTest {
private AuctionProcessor processor;
private MockProcessorContext context;
private KeyValueStore<String, AuctionDto> store;
@BeforeEach
public void init() {
processor = new AuctionProcessor();
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdesFactory.auctionSerde().getClass());
context = new MockProcessorContext(config);
store = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("action-key-store"),
Serdes.String(), MySerdesFactory.auctionSerde())
.withLoggingDisabled().build();
store.init(context, store);
context.register(store, null);
processor.init(context);
}
@Test
public void testInitialBid_then_ReturnTheBid() {
AuctionDto auctionDto = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
processor.process("item-1", auctionDto);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor forwards the first bid
MockProcessorContext.CapturedForward forward = forwarded.next();
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", auctionDto));
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor now contains item-1 with Alice as the bidder with 20.0
assertThat(store.get("item-1")).isEqualTo(auctionDto);
}
@Test
public void testLowerBid_then_ReturnPrevBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(10.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor does not forward anything
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(existingAuction);
}
@Test
public void testSameBid_then_ReturnPrevBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(20.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor does not forward anything
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(existingAuction);
}
@Test
public void testHigherBid_then_ReturnNewBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(25.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor forwards the newest bid
MockProcessorContext.CapturedForward forward = forwarded.next();
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", newAuction));
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(newAuction);
}
}
public void createCustomProcessor(final Topology topology) {
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store"), Serdes.String(),
MySerdesFactory.auctionSerde());
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionWallClockKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store-schedule-wall-clock"), Serdes.String(),
MySerdesFactory.auctionSerde());
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionStreamTimeKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store-schedule-stream-time"), Serdes.String(),
MySerdesFactory.auctionSerde());
topology.addSource("source", Serdes.String().deserializer(),
MySerdesFactory.auctionSerde().deserializer(), "auction-bid-input");
// topology for standard auction processor
topology.addProcessor("proc", AuctionProcessor::new, "source");
topology.addStateStore(auctionKeyStore, "proc");
topology.addSink("sink", "auction-bid-output", "proc");
// topology for stream time punctuator auction processor
topology.addProcessor("proc-stream-time", AuctionWithScheduleStreamTimeProcessor::new, "source");
topology.addStateStore(auctionStreamTimeKeyStore, "proc-stream-time");
topology.addSink("sink-stream-time", "auction-bid-stream-time-output", "proc-stream-time");
// topology for wall clock punctuator auction processor
topology.addProcessor("proc-wall-clock", AuctionWithScheduleWallClockProcessor::new, "source");
topology.addStateStore(auctionWallClockKeyStore, "proc-wall-clock");
topology.addSink("sink-wall-clock", "auction-bid-wall-clock-output", "proc-wall-clock");
}
// AuctionTopologyTest.java
@Test
@DisplayName("Test multiple auction with stream time")
public void testAuctionWithStreamTime() {
AuctionDto alice = AuctionDto.builder()
.customerName("Alice")
.bidPrice(10.0)
.build();
AuctionDto bob = AuctionDto.builder()
.customerName("Bob")
.bidPrice(9.5)
.build();
AuctionDto charlie = AuctionDto.builder()
.customerName("Charlie")
.bidPrice(10.5)
.build();
AuctionDto dave = AuctionDto.builder()
.customerName("Dave")
.bidPrice(1.5)
.build();
Instant now = Instant.now();
inputTopic.pipeInput("item-1", alice, now);
inputTopic.pipeInput("item-1", bob, now.plus(1, ChronoUnit.SECONDS));
inputTopic.pipeInput("item-1", charlie, now.plus(2, ChronoUnit.SECONDS));
inputTopic.pipeInput("item-2", dave, now.plus(3, ChronoUnit.SECONDS));
Map<String, AuctionDto> expected = Map.of(
"item-1", charlie,
"item-2", dave
);
// make sure the output is not empty
assertThat(outputStreamTimeTopic.isEmpty()).isFalse();
Map<String, AuctionDto> result = new HashMap<>();
while(!outputStreamTimeTopic.isEmpty()){
final KeyValue<String, AuctionDto> kv = outputStreamTimeTopic.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
}
// AuctionTopologyTest.java
@Test
@DisplayName("Test multiple auction with wall clock")
public void testAuctionWithWallClock() {
AuctionDto alice = AuctionDto.builder()
.customerName("Alice")
.bidPrice(10.0)
.build();
AuctionDto bob = AuctionDto.builder()
.customerName("Bob")
.bidPrice(9.5)
.build();
AuctionDto charlie = AuctionDto.builder()
.customerName("Charlie")
.bidPrice(10.5)
.build();
AuctionDto dave = AuctionDto.builder()
.customerName("Dave")
.bidPrice(1.5)
.build();
inputTopic.pipeInput("item-1", alice);
inputTopic.pipeInput("item-1", bob);
inputTopic.pipeInput("item-1", charlie);
inputTopic.pipeInput("item-2", dave);
Map<String, AuctionDto> expected = Map.of(
"item-1", charlie,
"item-2", dave
);
// output topic should be empty because the wall clock time still not advancing
assertThat(outputWallClockTopic.isEmpty()).isTrue();
// advancing wall clock time
testDriver.advanceWallClockTime(Duration.ofMillis(1000));
// make sure the output is not empty
assertThat(outputWallClockTopic.isEmpty()).isFalse();
Map<String, AuctionDto> result = new HashMap<>();
while(!outputWallClockTopic.isEmpty()){
final KeyValue<String, AuctionDto> kv = outputWallClockTopic.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
}
public class AuctionWithScheduleStreamTimeProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-stream-time");
this.context.schedule(Duration.ofMillis(1000), PunctuationType.STREAM_TIME, (timestamp) -> {
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all();
while (iter.hasNext()) {
final KeyValue<String, AuctionDto> entry = iter.next();
context.forward(entry.key, entry.value);
}
iter.close();
// commit the current processing progress
context.commit();
});
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
}
}
@Override
public void close() {
}
}
public class AuctionWithScheduleWallClockProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-wall-clock");
this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all();
while (iter.hasNext()) {
final KeyValue<String, AuctionDto> entry = iter.next();
context.forward(entry.key, entry.value);
}
iter.close();
// commit the current processing progress
context.commit();
});
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
}
}
@Override
public void close() {
}
}
// EmployeeTopologyTest.java
@Test
@DisplayName("Test Employee Topology between department and employee, exclude employment history")
public void testEmployeeAggregationTopologyWithoutEmploymentHistory() {
// Finance Department
DepartmentDto financeDept = DepartmentDto.builder()
.deptId(1)
.deptName("Finance")
.build();
// Employee: Alice
EmployeeDto alice = EmployeeDto.builder()
.empId(1000)
.empName("Alice")
.deptId(1)
.build();
// expected output
EmployeeResultDto expected = new EmployeeResultDto();
expected.setDeptId(1);
expected.setDeptName("Finance");
expected.setEmpId(1000);
expected.setEmpName("Alice");
// 1.1 insert finance dept to DEPT topic.
// Remember: I put key as null value because we do key repartitioning in deptTable.
// But this depends on your use case.
deptInput.pipeInput(null, financeDept);
// 1.2 output topic (EMP-RESULT) is empty because inner join behaviour between employee and dept
assertThat(employeeOutput.isEmpty()).isTrue();
// 2.1 insert employee to EMPLOYEE topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employeeInput.pipeInput(null, alice);
// 2.2 output topic (EMP-RESULT) now is not empty because there are two stream data with associated key (dept_id)
assertThat(employeeOutput.isEmpty()).isFalse();
assertThat(employeeOutput.readKeyValue()).isEqualTo(new KeyValue<>(1000, expected));
// 2.3 make sure no record left in the output topic
assertThat(employeeOutput.isEmpty()).isTrue();
}
@Test
@DisplayName("Test Employee Topology between department and employee, include employment history")
public void testEmployeeAggregationTopologyWithEmploymentHistory() {
// Finance Department
DepartmentDto financeDept = DepartmentDto.builder()
.deptId(1)
.deptName("Finance")
.build();
// Employee: Alice
EmployeeDto alice = EmployeeDto.builder()
.empId(1000)
.empName("Alice")
.deptId(1)
.build();
// History: Company A
EmploymentHistoryDto historyCompanyA = EmploymentHistoryDto.builder()
.empHistId(1)
.empId(1000)
.employerName("Company A")
.build();
// History: Company B
EmploymentHistoryDto historyCompanyB = EmploymentHistoryDto.builder()
.empHistId(1)
.empId(1000)
.employerName("Company B")
.build();
// expected output
EmployeeResultDto expected = new EmployeeResultDto();
expected.setDeptId(1);
expected.setDeptName("Finance");
expected.setEmpId(1000);
expected.setEmpName("Alice");
expected.setEmploymentHistory(Set.of("Company A", "Company B"));
// 1. insert finance dept to DEPT topic.
// Remember: I put key as null value because we do key repartitioning in deptTable.
// But this depends on your use case.
deptInput.pipeInput(null, financeDept);
// 2. insert employee to EMPLOYEE topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employeeInput.pipeInput(null, alice);
// 3. insert employee to EMPLOYMENT-HISTORY topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employmentHistoryInput.pipeInput(null, historyCompanyA);
employmentHistoryInput.pipeInput(null, historyCompanyB);
// make sure topic is not empty
assertThat(employeeOutput.isEmpty()).isFalse();
// loop until last records, because we cannot predict the left-join behaviour.
// what we now is only the last record should be as what we expected.
KeyValue<Integer, EmployeeResultDto> kv = null;
while(!employeeOutput.isEmpty()) {
kv = employeeOutput.readKeyValue();
}
// make sure kv is not null
assertThat(kv).isNotNull();
assertThat(kv).isEqualTo(new KeyValue<>(1000, expected));
// make sure no record left in the output topic
assertThat(employeeOutput.isEmpty()).isTrue();
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.6.2</version>
<scope>test</scope>
</dependency>
//WordCountTimeWindowsTopology.java -- see at line 10
public void createTopology(StreamsBuilder builder) {
final KStream<String, String> textLines = builder
.stream("streams-plaintext-input",
Consumed.with(Serdes.String(), Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(5)))
.count(Materialized.as("WordCount"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(),value))
.to("streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
}
// WordCountTimeWindowsTopologyTest.java - see at line 29
@Test
@DisplayName("Test word count streams")
public void testWordCountStream() {
String text1 = "Welcome to kafka streams";
String text2 = "Kafka streams is great";
String text3 = "Welcome back";
// expected output
Map<String,Long> expected = Map.of(
// please take note, now the welcome is only 1.
// Because the second welcome word, come after 5 minutes duration.
"welcome", 1L,
"to", 1L,
"kafka", 2L,
"streams", 2L,
"is", 1L,
"great", 1L,
"back", 1L
);
final Instant now = Instant.now();
// insert two lines with the same timestamp
plainTextInput.pipeInput(null,text1, now);
plainTextInput.pipeInput(null,text2, now);
// simulate 5 minutes after the first two
plainTextInput.pipeInput(null,text3, now.plus(5, ChronoUnit.MINUTES));
assertThat(wordCountOutput.isEmpty()).isFalse();
// result
Map<String, Long> result = new HashMap<>();
while(!wordCountOutput.isEmpty()) {
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
assertThat(wordCountOutput.isEmpty()).isTrue();
}
public void createTopology(StreamsBuilder builder) {
final KStream<String, String> textLines = builder
.stream("streams-plaintext-input",
Consumed.with(Serdes.String(), Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to("streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
}
// WordCountTopologyTest.java
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> plainTextInput;
private TestOutputTopic<String, Long> wordCountOutput;
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();
@BeforeEach
public void init() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
WordCountTopology wordCountTopology = new WordCountTopology();
final StreamsBuilder builder = new StreamsBuilder();
wordCountTopology.createTopology(builder);
final Topology topology = builder.build();
testDriver = new TopologyTestDriver(topology, props);
plainTextInput = testDriver.createInputTopic("streams-plaintext-input", stringSerde.serializer(),
stringSerde.serializer());
wordCountOutput = testDriver.createOutputTopic("streams-wordcount-output", stringSerde.deserializer(),
longSerde.deserializer());
}
@AfterEach
public void tearDown() throws IOException {
testDriver.close();
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/app-id"));
}
// WordCountTopologyTest.java
@Test
@DisplayName("Test word count streams")
public void testWordCountStream() {
String text1 = "Welcome to kafka streams";
String text2 = "Kafka streams is great";
String text3 = "Welcome back";
// expected output
Map<String,Long> expected = Map.of(
"welcome", 2L,
"to", 1L,
"kafka", 2L,
"streams", 2L,
"is", 1L,
"great", 1L,
"back", 1L
);
plainTextInput.pipeInput(null,text1);
plainTextInput.pipeInput(null,text2);
plainTextInput.pipeInput(null,text3);
assertThat(wordCountOutput.isEmpty()).isFalse();
// result
Map<String, Long> result = new HashMap<>();
while(!wordCountOutput.isEmpty()) {
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
assertThat(wordCountOutput.isEmpty()).isTrue();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment